diff options
Diffstat (limited to 'core')
3 files changed, 26 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 3848734d6f..ede1e23f4f 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} /** - * Whether the cleaning thread will block on cleanup tasks. + * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which + * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter). * * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary * workaround for the issue, which is ultimately caused by the way the BlockManager actors @@ -76,6 +77,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val blockOnCleanupTasks = sc.conf.getBoolean( "spark.cleaner.referenceTracking.blocking", true) + /** + * Whether the cleaning thread will block on shuffle cleanup tasks. + * + * When context cleaner is configured to block on every delete request, it can throw timeout + * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this + * parameter by default disables blocking on shuffle cleanups. Note that this does not affect + * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround, + * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is + * resolved. + */ + private val blockOnShuffleCleanupTasks = sc.conf.getBoolean( + "spark.cleaner.referenceTracking.blocking.shuffle", false) + @volatile private var stopped = false /** Attach a listener object to get information of when objects are cleaned. */ @@ -128,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks) + doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 669307765d..e67b3dc5ce 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -101,7 +101,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log def removeRdd(rddId: Int, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) future.onFailure { - case e: Throwable => logError("Failed to remove RDD " + rddId, e) + case e: Exception => + logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}") } if (blocking) { Await.result(future, timeout) @@ -112,7 +113,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log def removeShuffle(shuffleId: Int, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) future.onFailure { - case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e) + case e: Exception => + logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}") } if (blocking) { Await.result(future, timeout) @@ -124,9 +126,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log val future = askDriverWithReply[Future[Seq[Int]]]( RemoveBroadcast(broadcastId, removeFromMaster)) future.onFailure { - case e: Throwable => - logError("Failed to remove broadcast " + broadcastId + - " with removeFromMaster = " + removeFromMaster, e) + case e: Exception => + logWarning(s"Failed to remove broadcast $broadcastId" + + s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}") } if (blocking) { Await.result(future, timeout) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 4bc4346c0a..2744894277 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -52,6 +52,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha .setMaster("local[2]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") + .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") .set("spark.shuffle.manager", shuffleManager.getName) before { @@ -243,6 +244,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { .setMaster("local-cluster[2, 1, 512]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") + .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") .set("spark.shuffle.manager", shuffleManager.getName) sc = new SparkContext(conf2) @@ -319,6 +321,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor .setMaster("local-cluster[2, 1, 512]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") + .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") .set("spark.shuffle.manager", shuffleManager.getName) sc = new SparkContext(conf2) |