aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-07 00:24:44 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-07 00:24:44 -0700
commit01187f59b3d118495b6cfea965690829b99a36fa (patch)
tree30e3e582fb40096c7bf6eda09119dace1f2e5e66 /streaming
parentcfdadcbd2b529cd9ac721509a7ebafe436afcd8d (diff)
downloadspark-01187f59b3d118495b6cfea965690829b99a36fa.tar.gz
spark-01187f59b3d118495b6cfea965690829b99a36fa.tar.bz2
spark-01187f59b3d118495b6cfea965690829b99a36fa.zip
[SPARK-7217] [STREAMING] Add configuration to control the default behavior of StreamingContext.stop() implicitly calling SparkContext.stop()
In environments like notebooks, the SparkContext is managed by the underlying infrastructure and it is expected that the SparkContext will not be stopped. However, StreamingContext.stop() calls SparkContext.stop() as a non-intuitive side-effect. This PR adds a configuration in SparkConf that sets the default StreamingContext stop behavior. It should be such that the existing behavior does not change for existing users. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5929 from tdas/SPARK-7217 and squashes the following commits: 869a763 [Tathagata Das] Changed implementation. 685fe00 [Tathagata Das] Added configuration
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala19
2 files changed, 24 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b1ad0d42ec..bbdb4e8af0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -563,13 +563,17 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams immediately (does not wait for all received data
- * to be processed).
+ * to be processed). By default, if `stopSparkContext` is not specified, the underlying
+ * SparkContext will also be stopped. This implicit behavior can be configured using the
+ * SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
- * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
+ * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
- def stop(stopSparkContext: Boolean = true): Unit = synchronized {
+ def stop(
+ stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
+ ): Unit = synchronized {
stop(stopSparkContext, false)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5207b7109e..a589deb1fa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.state === ssc.StreamingContextState.Started)
ssc.stop()
assert(ssc.state === ssc.StreamingContextState.Stopped)
+
+ // Make sure that the SparkContext is also stopped by default
+ intercept[Exception] {
+ ssc.sparkContext.makeRDD(1 to 10)
+ }
}
test("start multiple times") {
@@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
test("stop only streaming context") {
- ssc = new StreamingContext(master, appName, batchDuration)
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
+
+ // Explicitly do not stop SparkContext
+ ssc = new StreamingContext(conf, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop(stopSparkContext = false)
assert(sc.makeRDD(1 to 100).collect().size === 100)
- ssc = new StreamingContext(sc, batchDuration)
+ sc.stop()
+
+ // Implicitly do not stop SparkContext
+ conf.set("spark.streaming.stopSparkContextByDefault", "false")
+ ssc = new StreamingContext(conf, batchDuration)
+ sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop()
+ assert(sc.makeRDD(1 to 100).collect().size === 100)
+ sc.stop()
}
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {