aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala19
2 files changed, 24 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b1ad0d42ec..bbdb4e8af0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -563,13 +563,17 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams immediately (does not wait for all received data
- * to be processed).
+ * to be processed). By default, if `stopSparkContext` is not specified, the underlying
+ * SparkContext will also be stopped. This implicit behavior can be configured using the
+ * SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
- * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
+ * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
- def stop(stopSparkContext: Boolean = true): Unit = synchronized {
+ def stop(
+ stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
+ ): Unit = synchronized {
stop(stopSparkContext, false)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5207b7109e..a589deb1fa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.state === ssc.StreamingContextState.Started)
ssc.stop()
assert(ssc.state === ssc.StreamingContextState.Stopped)
+
+ // Make sure that the SparkContext is also stopped by default
+ intercept[Exception] {
+ ssc.sparkContext.makeRDD(1 to 10)
+ }
}
test("start multiple times") {
@@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
test("stop only streaming context") {
- ssc = new StreamingContext(master, appName, batchDuration)
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
+
+ // Explicitly do not stop SparkContext
+ ssc = new StreamingContext(conf, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop(stopSparkContext = false)
assert(sc.makeRDD(1 to 100).collect().size === 100)
- ssc = new StreamingContext(sc, batchDuration)
+ sc.stop()
+
+ // Implicitly do not stop SparkContext
+ conf.set("spark.streaming.stopSparkContextByDefault", "false")
+ ssc = new StreamingContext(conf, batchDuration)
+ sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop()
+ assert(sc.makeRDD(1 to 100).collect().size === 100)
+ sc.stop()
}
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {