aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-04-03 19:23:11 +0100
committerSean Owen <sowen@cloudera.com>2015-04-03 19:23:11 +0100
commit2c43ea38ee0db6b292c14baf6bc6f8d16f509c9d (patch)
tree093e0a5107c4cd3811e04200959edf7f140abce1
parentc23ba81b8cf86c3a085de8ddfef9403ff6fcd87f (diff)
downloadspark-2c43ea38ee0db6b292c14baf6bc6f8d16f509c9d.tar.gz
spark-2c43ea38ee0db6b292c14baf6bc6f8d16f509c9d.tar.bz2
spark-2c43ea38ee0db6b292c14baf6bc6f8d16f509c9d.zip
[SPARK-6492][CORE] SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies
I've added a timeout and retry loop around the SparkContext shutdown code that should fix this deadlock. If a SparkContext shutdown is in progress when another thread comes knocking, it will wait for 10 seconds for the lock, then fall through where the outer loop will re-submit the request. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Closes #5277 from ilganeli/SPARK-6492 and squashes the following commits: 8617a7e [Ilya Ganelin] Resolved merge conflict 2fbab66 [Ilya Ganelin] Added MIMA Exclude a0e2c70 [Ilya Ganelin] Deleted stale imports fa28ce7 [Ilya Ganelin] reverted to just having a single stopped 76fc825 [Ilya Ganelin] Updated to use atomic booleans instead of the synchronized vars 6e8a7f7 [Ilya Ganelin] Removing unecessary null check for now since i'm not fixing stop ordering yet cdf7073 [Ilya Ganelin] [SPARK-6492] Moved stopped=true back to the start of the shutdown sequence so this can be addressed in a seperate PR 7fb795b [Ilya Ganelin] Spacing b7a0c5c [Ilya Ganelin] Import ordering df8224f [Ilya Ganelin] Added comment for added lock 343cb94 [Ilya Ganelin] [SPARK-6492] Added timeout/retry logic to fix a deadlock in SparkContext shutdown
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala59
-rw-r--r--project/MimaExcludes.scala4
2 files changed, 34 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5b3778ead6..abf81e312d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -23,7 +23,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
@@ -95,10 +95,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val startTime = System.currentTimeMillis()
- @volatile private var stopped: Boolean = false
+ private val stopped: AtomicBoolean = new AtomicBoolean(false)
private def assertNotStopped(): Unit = {
- if (stopped) {
+ if (stopped.get()) {
throw new IllegalStateException("Cannot call methods on a stopped SparkContext")
}
}
@@ -1390,33 +1390,34 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
addedJars.clear()
}
- /** Shut down the SparkContext. */
+ // Shut down the SparkContext.
def stop() {
- SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
- if (!stopped) {
- stopped = true
- postApplicationEnd()
- ui.foreach(_.stop())
- env.metricsSystem.report()
- metadataCleaner.cancel()
- cleaner.foreach(_.stop())
- executorAllocationManager.foreach(_.stop())
- dagScheduler.stop()
- dagScheduler = null
- listenerBus.stop()
- eventLogger.foreach(_.stop())
- env.actorSystem.stop(heartbeatReceiver)
- progressBar.foreach(_.stop())
- taskScheduler = null
- // TODO: Cache.stop()?
- env.stop()
- SparkEnv.set(null)
- logInfo("Successfully stopped SparkContext")
- SparkContext.clearActiveContext()
- } else {
- logInfo("SparkContext already stopped")
- }
+ // Use the stopping variable to ensure no contention for the stop scenario.
+ // Still track the stopped variable for use elsewhere in the code.
+
+ if (!stopped.compareAndSet(false, true)) {
+ logInfo("SparkContext already stopped.")
+ return
}
+
+ postApplicationEnd()
+ ui.foreach(_.stop())
+ env.metricsSystem.report()
+ metadataCleaner.cancel()
+ cleaner.foreach(_.stop())
+ executorAllocationManager.foreach(_.stop())
+ dagScheduler.stop()
+ dagScheduler = null
+ listenerBus.stop()
+ eventLogger.foreach(_.stop())
+ env.actorSystem.stop(heartbeatReceiver)
+ progressBar.foreach(_.stop())
+ taskScheduler = null
+ // TODO: Cache.stop()?
+ env.stop()
+ SparkEnv.set(null)
+ SparkContext.clearActiveContext()
+ logInfo("Successfully stopped SparkContext")
}
@@ -1478,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
- if (stopped) {
+ if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 54500f7c27..c2d828f982 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -60,6 +60,10 @@ object MimaExcludes {
) ++ Seq(
// SPARK-6510 Add a Graph#minus method acting as Set#difference
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
+ ) ++ Seq(
+ // SPARK-6492 Fix deadlock in SparkContext.stop()
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" +
+ "apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK")
)
case v if v.startsWith("1.3") =>