aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-08-27 00:13:38 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-27 00:13:38 -0700
commit3e2864e40472b32e6a7eec5ba3bc83562d2a1a62 (patch)
treeae54f24c9974d1343bee71c8148294a2c3449c9e /core
parent9d65f2712c250a561c9c1f6259aa12e861ed239d (diff)
downloadspark-3e2864e40472b32e6a7eec5ba3bc83562d2a1a62.tar.gz
spark-3e2864e40472b32e6a7eec5ba3bc83562d2a1a62.tar.bz2
spark-3e2864e40472b32e6a7eec5ba3bc83562d2a1a62.zip
[SPARK-3139] Made ContextCleaner to not block on shuffles
As a workaround for SPARK-3015, the ContextCleaner was made "blocking", that is, it cleaned items one-by-one. But shuffles can take a long time to be deleted. Given that the RC for 1.1 is imminent, this PR makes a narrow change in the context cleaner - not wait for shuffle cleanups to complete. Also it changes the error messages on failure to delete to be milder warnings, as exceptions in the delete code path for one item does not really stop the actual functioning of the system. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2143 from tdas/cleaner-shuffle-fix and squashes the following commits: 9c84202 [Tathagata Das] Restoring default blocking behavior in ContextCleanerSuite, and added docs to identify that spark.cleaner.referenceTracking.blocking does not control shuffle. 2181329 [Tathagata Das] Mark shuffle cleanup as non-blocking. e337cc2 [Tathagata Das] Changed semantics based on PR comments. 387b578 [Tathagata Das] Made ContextCleaner to not block on shuffles
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala3
3 files changed, 26 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 3848734d6f..ede1e23f4f 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
/**
- * Whether the cleaning thread will block on cleanup tasks.
+ * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
+ * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
@@ -76,6 +77,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", true)
+ /**
+ * Whether the cleaning thread will block on shuffle cleanup tasks.
+ *
+ * When context cleaner is configured to block on every delete request, it can throw timeout
+ * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
+ * parameter by default disables blocking on shuffle cleanups. Note that this does not affect
+ * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
+ * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+ * resolved.
+ */
+ private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
+ "spark.cleaner.referenceTracking.blocking.shuffle", false)
+
@volatile private var stopped = false
/** Attach a listener object to get information of when objects are cleaned. */
@@ -128,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
- doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
+ doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 669307765d..e67b3dc5ce 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -101,7 +101,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
def removeRdd(rddId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
- case e: Throwable => logError("Failed to remove RDD " + rddId, e)
+ case e: Exception =>
+ logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
@@ -112,7 +113,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
- case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e)
+ case e: Exception =>
+ logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
@@ -124,9 +126,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
val future = askDriverWithReply[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
- case e: Throwable =>
- logError("Failed to remove broadcast " + broadcastId +
- " with removeFromMaster = " + removeFromMaster, e)
+ case e: Exception =>
+ logWarning(s"Failed to remove broadcast $broadcastId" +
+ s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
}
if (blocking) {
Await.result(future, timeout)
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 4bc4346c0a..2744894277 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -52,6 +52,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
before {
@@ -243,6 +244,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
sc = new SparkContext(conf2)
@@ -319,6 +321,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
+ .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
.set("spark.shuffle.manager", shuffleManager.getName)
sc = new SparkContext(conf2)