aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-15 22:55:32 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-15 22:55:32 -0700
commitc9da466edb83e45a159ccc17c68856a511b9e8b7 (patch)
tree9de75918b439606e53c2153cd5e7a5de18e7a979 /core
parent2e069ca6560bf7ab07bd019f9530b42f4fe45014 (diff)
downloadspark-c9da466edb83e45a159ccc17c68856a511b9e8b7.tar.gz
spark-c9da466edb83e45a159ccc17c68856a511b9e8b7.tar.bz2
spark-c9da466edb83e45a159ccc17c68856a511b9e8b7.zip
[SPARK-3015] Block on cleaning tasks to prevent Akka timeouts
More detail on the issue is described in [SPARK-3015](https://issues.apache.org/jira/browse/SPARK-3015), but the TLDR is if we send too many blocking Akka messages that are dependent on each other in quick successions, then we end up causing a few of these messages to time out and ultimately kill the executors. As of #1498, we broadcast each RDD whether or not it is persisted. This means if we create many RDDs (each of which becomes a broadcast) and the driver performs a GC that cleans up all of these broadcast blocks, then we end up sending many `RemoveBroadcast` messages in parallel and trigger the chain of blocking messages at high frequencies. We do not know of the Akka-level root cause yet, so this is intended to be a temporary solution until we identify the real issue. I have done some preliminary testing of enabling blocking and observed that the queue length remains quite low (< 1000) even under very intensive workloads. In the long run, we should do something more sophisticated to allow a limited degree of parallelism through batching clean up tasks or processing them in a sliding window. In the longer run, we should clean up the whole `BlockManager*` message passing interface to avoid unnecessarily awaiting on futures created from Akka asks. tdas pwendell mengxr Author: Andrew Or <andrewor14@gmail.com> Closes #1931 from andrewor14/reference-blocking and squashes the following commits: d0f7195 [Andrew Or] Merge branch 'master' of github.com:apache/spark into reference-blocking ce9daf5 [Andrew Or] Remove logic for logging queue length 111192a [Andrew Or] Add missing space in log message (minor) a183b83 [Andrew Or] Switch order of code blocks (minor) 9fd1fe6 [Andrew Or] Remove outdated log 104b366 [Andrew Or] Use the actual reference queue length 0b7e768 [Andrew Or] Block on cleaning tasks by default + log error on queue full
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala12
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index bf3c3a6ceb..3848734d6f 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/**
* Whether the cleaning thread will block on cleanup tasks.
- * This is set to true only for tests.
+ *
+ * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
+ * workaround for the issue, which is ultimately caused by the way the BlockManager actors
+ * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
+ * for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
+ * longer in scope.
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
- "spark.cleaner.referenceTracking.blocking", false)
+ "spark.cleaner.referenceTracking.blocking", true)
@volatile private var stopped = false
@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
-
- // Used for testing. These methods explicitly blocks until cleanup is completed
- // to ensure that more reliable testing.
}
private object ContextCleaner {