aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala3
3 files changed, 26 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 3848734d6f..ede1e23f4f 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
/**
- * Whether the cleaning thread will block on cleanup tasks.
+ * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
+ * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
@@ -76,6 +77,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", true)
+ /**
+ * Whether the cleaning thread will block on shuffle cleanup tasks.
+ *
+ * When context cleaner is configured to block on every delete request, it can throw timeout
+ * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
+ * parameter by default disables blocking on shuffle cleanups. Note that this does not affect
+ * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
+ * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+ * resolved.
+ */
+ private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
+ "spark.cleaner.referenceTracking.blocking.shuffle", false)
+
@volatile private var stopped = false
/** Attach a listener object to get information of when objects are cleaned. */
@@ -128,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
- doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
+ doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 669307765d..e67b3dc5ce 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -101,7 +101,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
def removeRdd(rddId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
- case e: Throwable => logError("Failed to remove RDD " + rddId, e)
+ case e: Exception =>
+ logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
@@ -112,7 +113,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
- case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e)
+ case e: Exception =>
+ logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
@@ -124,9 +126,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
val future = askDriverWithReply[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
- case e: Throwable =>
- logError("Failed to remove broadcast " + broadcastId +
- " with removeFromMaster = " + removeFromMaster, e)
+ case e: Exception =>
+ logWarning(s"Failed to remove broadcast $broadcastId" +
+ s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 4bc4346c0a..2744894277 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -52,6 +52,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
before {
@@ -243,6 +244,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
sc = new SparkContext(conf2)
@@ -319,6 +321,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
sc = new SparkContext(conf2)