aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-07 00:24:44 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-07 00:27:02 -0700
commitcb13c98b1aa7fd35a285dadfecac684630c57ad4 (patch)
treec05ec2bf63ed3aa94d6cfe7f5fbc0b0a2e35c289 /streaming/src
parent065d114c6d0d0c51ee9f06b93c87dba196eb3626 (diff)
downloadspark-cb13c98b1aa7fd35a285dadfecac684630c57ad4.tar.gz
spark-cb13c98b1aa7fd35a285dadfecac684630c57ad4.tar.bz2
spark-cb13c98b1aa7fd35a285dadfecac684630c57ad4.zip
[SPARK-7217] [STREAMING] Add configuration to control the default behavior of StreamingContext.stop() implicitly calling SparkContext.stop()
In environments like notebooks, the SparkContext is managed by the underlying infrastructure and it is expected that the SparkContext will not be stopped. However, StreamingContext.stop() calls SparkContext.stop() as a non-intuitive side-effect. This PR adds a configuration in SparkConf that sets the default StreamingContext stop behavior. It should be such that the existing behavior does not change for existing users. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5929 from tdas/SPARK-7217 and squashes the following commits: 869a763 [Tathagata Das] Changed implementation. 685fe00 [Tathagata Das] Added configuration (cherry picked from commit 01187f59b3d118495b6cfea965690829b99a36fa) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala19
2 files changed, 24 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b1ad0d42ec..bbdb4e8af0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -563,13 +563,17 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams immediately (does not wait for all received data
- * to be processed).
+ * to be processed). By default, if `stopSparkContext` is not specified, the underlying
+ * SparkContext will also be stopped. This implicit behavior can be configured using the
+ * SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
- * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
+ * @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
- def stop(stopSparkContext: Boolean = true): Unit = synchronized {
+ def stop(
+ stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
+ ): Unit = synchronized {
stop(stopSparkContext, false)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 5207b7109e..a589deb1fa 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.state === ssc.StreamingContextState.Started)
ssc.stop()
assert(ssc.state === ssc.StreamingContextState.Stopped)
+
+ // Make sure that the SparkContext is also stopped by default
+ intercept[Exception] {
+ ssc.sparkContext.makeRDD(1 to 10)
+ }
}
test("start multiple times") {
@@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
test("stop only streaming context") {
- ssc = new StreamingContext(master, appName, batchDuration)
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
+
+ // Explicitly do not stop SparkContext
+ ssc = new StreamingContext(conf, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop(stopSparkContext = false)
assert(sc.makeRDD(1 to 100).collect().size === 100)
- ssc = new StreamingContext(sc, batchDuration)
+ sc.stop()
+
+ // Implicitly do not stop SparkContext
+ conf.set("spark.streaming.stopSparkContextByDefault", "false")
+ ssc = new StreamingContext(conf, batchDuration)
+ sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop()
+ assert(sc.makeRDD(1 to 100).collect().size === 100)
+ sc.stop()
}
test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {