diff options
author | Andrew Or <andrew@databricks.com> | 2015-03-03 13:44:05 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-03-03 13:44:05 -0800 |
commit | fe63e822918a01e1c1d741052b932e9944745fb6 (patch) | |
tree | ec9bb1c3eec7732b5d59e468bd03cd4bcce9a2b5 /core | |
parent | e750a6bfddf1d7bf7d3e99a424ec2b83a18b40d9 (diff) | |
download | spark-fe63e822918a01e1c1d741052b932e9944745fb6.tar.gz spark-fe63e822918a01e1c1d741052b932e9944745fb6.tar.bz2 spark-fe63e822918a01e1c1d741052b932e9944745fb6.zip |
[SPARK-6132] ContextCleaner race condition across SparkContexts
The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`. This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, which could be one that belongs to a different `SparkContext`.
JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`. We were able to reproduce the failure locally (though it is not deterministic and very hard to reproduce).
Author: Andrew Or <andrew@databricks.com>
Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits:
29168c0 [Andrew Or] Synchronize ContextCleaner stop
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ContextCleaner.scala | 39 |
1 files changed, 26 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 4a9d007353..4dab886698 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.start() } - /** Stop the cleaner. */ + /** + * Stop the cleaning thread and wait until the thread has finished running its current task. + */ def stop() { stopped = true + // Interrupt the cleaning thread, but wait until the current task has finished before + // doing so. This guards against the race condition where a cleaning thread may + // potentially clean similarly named variables created by a different SparkContext, + // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132). + synchronized { + cleaningThread.interrupt() + } + cleaningThread.join() } /** Register a RDD for cleanup when it is garbage collected. */ @@ -140,18 +150,21 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) - reference.map(_.task).foreach { task => - logDebug("Got cleaning task " + task) - referenceBuffer -= reference.get - task match { - case CleanRDD(rddId) => - doCleanupRDD(rddId, blocking = blockOnCleanupTasks) - case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) - case CleanBroadcast(broadcastId) => - doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) - case CleanAccum(accId) => - doCleanupAccum(accId, blocking = blockOnCleanupTasks) + // Synchronize here to avoid being interrupted on stop() + synchronized { + reference.map(_.task).foreach { task => + logDebug("Got cleaning task " + task) + referenceBuffer -= reference.get + task match { + case CleanRDD(rddId) => + doCleanupRDD(rddId, blocking = blockOnCleanupTasks) + case CleanShuffle(shuffleId) => + doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) + case CleanBroadcast(broadcastId) => + doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) + case CleanAccum(accId) => + doCleanupAccum(accId, blocking = blockOnCleanupTasks) + } } } } catch { |