aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authortedyu <yuzhihong@gmail.com>2015-11-24 12:22:33 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-11-24 12:22:33 -0800
commit81012546ee5a80d2576740af0dad067b0f5962c5 (patch)
tree104c528b71a205b16a813740d4bcfe0e0f46041d /core
parent19530da6903fa59b051eec69b9c17e231c68454b (diff)
downloadspark-81012546ee5a80d2576740af0dad067b0f5962c5.tar.gz
spark-81012546ee5a80d2576740af0dad067b0f5962c5.tar.bz2
spark-81012546ee5a80d2576740af0dad067b0f5962c5.zip
[SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's thread
This is continuation of SPARK-11761 Andrew suggested adding this protection. See tail of https://github.com/apache/spark/pull/9741 Author: tedyu <yuzhihong@gmail.com> Closes #9852 from tedyu/master.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala31
2 files changed, 35 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b153a7b08e..e19ba11370 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Shut down the SparkContext.
def stop() {
+ if (AsynchronousListenerBus.withinListenerThread.value) {
+ throw new SparkException("Cannot stop SparkContext within listener thread of" +
+ " AsynchronousListenerBus")
+ }
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
if (!stopped.compareAndSet(false, true)) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 84e545851f..f20d5be7c0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.scalatest.Matchers
+import org.apache.spark.SparkException
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val jobCompletionTime = 1421191296660L
+ test("don't call sc.stop in listener") {
+ sc = new SparkContext("local", "SparkListenerSuite")
+ val listener = new SparkContextStoppingListener(sc)
+ val bus = new LiveListenerBus
+ bus.addListener(listener)
+
+ // Starting listener bus should flush all buffered events
+ bus.start(sc)
+ bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+ bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+ bus.stop()
+ assert(listener.sparkExSeen)
+ }
+
test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}
+/**
+ * A simple listener that tries to stop SparkContext.
+ */
+private class SparkContextStoppingListener(val sc: SparkContext) extends SparkListener {
+ @volatile var sparkExSeen = false
+ override def onJobEnd(job: SparkListenerJobEnd): Unit = {
+ try {
+ sc.stop()
+ } catch {
+ case se: SparkException =>
+ sparkExSeen = true
+ }
+ }
+}
+
private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1