aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-03-03 13:44:05 -0800
committerAndrew Or <andrew@databricks.com>2015-03-03 13:44:05 -0800
commitfe63e822918a01e1c1d741052b932e9944745fb6 (patch)
treeec9bb1c3eec7732b5d59e468bd03cd4bcce9a2b5
parente750a6bfddf1d7bf7d3e99a424ec2b83a18b40d9 (diff)
downloadspark-fe63e822918a01e1c1d741052b932e9944745fb6.tar.gz
spark-fe63e822918a01e1c1d741052b932e9944745fb6.tar.bz2
spark-fe63e822918a01e1c1d741052b932e9944745fb6.zip
[SPARK-6132] ContextCleaner race condition across SparkContexts
The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`. This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, which could be one that belongs to a different `SparkContext`. JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`. We were able to reproduce the failure locally (though it is not deterministic and very hard to reproduce). Author: Andrew Or <andrew@databricks.com> Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits: 29168c0 [Andrew Or] Synchronize ContextCleaner stop
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala39
1 files changed, 26 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4a9d007353..4dab886698 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.start()
}
- /** Stop the cleaner. */
+ /**
+ * Stop the cleaning thread and wait until the thread has finished running its current task.
+ */
def stop() {
stopped = true
+ // Interrupt the cleaning thread, but wait until the current task has finished before
+ // doing so. This guards against the race condition where a cleaning thread may
+ // potentially clean similarly named variables created by a different SparkContext,
+ // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
+ synchronized {
+ cleaningThread.interrupt()
+ }
+ cleaningThread.join()
}
/** Register a RDD for cleanup when it is garbage collected. */
@@ -140,18 +150,21 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
- reference.map(_.task).foreach { task =>
- logDebug("Got cleaning task " + task)
- referenceBuffer -= reference.get
- task match {
- case CleanRDD(rddId) =>
- doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
- case CleanShuffle(shuffleId) =>
- doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
- case CleanBroadcast(broadcastId) =>
- doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
- case CleanAccum(accId) =>
- doCleanupAccum(accId, blocking = blockOnCleanupTasks)
+ // Synchronize here to avoid being interrupted on stop()
+ synchronized {
+ reference.map(_.task).foreach { task =>
+ logDebug("Got cleaning task " + task)
+ referenceBuffer -= reference.get
+ task match {
+ case CleanRDD(rddId) =>
+ doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
+ case CleanShuffle(shuffleId) =>
+ doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
+ case CleanBroadcast(broadcastId) =>
+ doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+ case CleanAccum(accId) =>
+ doCleanupAccum(accId, blocking = blockOnCleanupTasks)
+ }
}
}
} catch {