aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-11-08 18:10:23 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-08 18:10:23 -0800
commit7b41b17f3296eea3282efbdceb6b28baf128287d (patch)
tree9bdea34c72f7c5f21153a09a04b72293660b0580 /streaming/src/test
parent4af5c7e24455246c61c1f3c22225507e720d721d (diff)
downloadspark-7b41b17f3296eea3282efbdceb6b28baf128287d.tar.gz
spark-7b41b17f3296eea3282efbdceb6b28baf128287d.tar.bz2
spark-7b41b17f3296eea3282efbdceb6b28baf128287d.zip
[SPARK-4301] StreamingContext should not allow start() to be called after calling stop()
In Spark 1.0.0+, calling `stop()` on a StreamingContext that has not been started is a no-op which has no side-effects. This allows users to call `stop()` on a fresh StreamingContext followed by `start()`. I believe that this almost always indicates an error and is not behavior that we should support. Since we don't allow `start() stop() start()` then I don't think it makes sense to allow `stop() start()`. The current behavior can lead to resource leaks when StreamingContext constructs its own SparkContext: if I call `stop(stopSparkContext=True)`, then I expect StreamingContext's underlying SparkContext to be stopped irrespective of whether the StreamingContext has been started. This is useful when writing unit test fixtures. Prior discussions: - https://github.com/apache/spark/pull/3053#discussion-diff-19710333R490 - https://github.com/apache/spark/pull/3121#issuecomment-61927353 Author: Josh Rosen <joshrosen@databricks.com> Closes #3160 from JoshRosen/SPARK-4301 and squashes the following commits: dbcc929 [Josh Rosen] Address more review comments bdbe5da [Josh Rosen] Stop SparkContext after stopping scheduler, not before. 03e9c40 [Josh Rosen] Always stop SparkContext, even if stop(false) has already been called. 832a7f4 [Josh Rosen] Address review comment 5142517 [Josh Rosen] Add tests; improve Scaladoc. 813e471 [Josh Rosen] Revert workaround added in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49 5558e70 [Josh Rosen] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala25
1 files changed, 19 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f47772947d..4b49c4d251 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
after {
if (ssc != null) {
ssc.stop()
- if (ssc.sc != null) {
- // Calling ssc.stop() does not always stop the associated SparkContext.
- ssc.sc.stop()
- }
ssc = null
}
if (sc != null) {
@@ -137,11 +133,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}
- test("stop before start and start after stop") {
+ test("stop before start") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception
- ssc.start()
+ }
+
+ test("start after stop") {
+ // Regression test for SPARK-4301
+ ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register()
ssc.stop()
intercept[SparkException] {
ssc.start() // start after stop should throw exception
@@ -161,6 +162,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}
+ test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register()
+ ssc.stop(stopSparkContext = false)
+ assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
+ ssc.stop(stopSparkContext = true)
+ // Check that the SparkContext is actually stopped:
+ intercept[Exception] {
+ ssc.sc.makeRDD(1 to 100).collect()
+ }
+ }
+
test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("spark.cleaner.ttl", "3600")