aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-11-08 18:10:23 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-08 18:10:23 -0800
commit7b41b17f3296eea3282efbdceb6b28baf128287d (patch)
tree9bdea34c72f7c5f21153a09a04b72293660b0580
parent4af5c7e24455246c61c1f3c22225507e720d721d (diff)
downloadspark-7b41b17f3296eea3282efbdceb6b28baf128287d.tar.gz
spark-7b41b17f3296eea3282efbdceb6b28baf128287d.tar.bz2
spark-7b41b17f3296eea3282efbdceb6b28baf128287d.zip
[SPARK-4301] StreamingContext should not allow start() to be called after calling stop()
In Spark 1.0.0+, calling `stop()` on a StreamingContext that has not been started is a no-op which has no side-effects. This allows users to call `stop()` on a fresh StreamingContext followed by `start()`. I believe that this almost always indicates an error and is not behavior that we should support. Since we don't allow `start() stop() start()` then I don't think it makes sense to allow `stop() start()`. The current behavior can lead to resource leaks when StreamingContext constructs its own SparkContext: if I call `stop(stopSparkContext=True)`, then I expect StreamingContext's underlying SparkContext to be stopped irrespective of whether the StreamingContext has been started. This is useful when writing unit test fixtures. Prior discussions: - https://github.com/apache/spark/pull/3053#discussion-diff-19710333R490 - https://github.com/apache/spark/pull/3121#issuecomment-61927353 Author: Josh Rosen <joshrosen@databricks.com> Closes #3160 from JoshRosen/SPARK-4301 and squashes the following commits: dbcc929 [Josh Rosen] Address more review comments bdbe5da [Josh Rosen] Stop SparkContext after stopping scheduler, not before. 03e9c40 [Josh Rosen] Always stop SparkContext, even if stop(false) has already been called. 832a7f4 [Josh Rosen] Address review comment 5142517 [Josh Rosen] Add tests; improve Scaladoc. 813e471 [Josh Rosen] Revert workaround added in https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49 5558e70 [Josh Rosen] StreamingContext.stop() should stop SparkContext even if StreamingContext has not been started yet.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala38
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala25
2 files changed, 40 insertions, 23 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 23d6d1c5e5..54b219711e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -436,10 +436,10 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
+ *
+ * @throws SparkException if the context has already been started or stopped.
*/
def start(): Unit = synchronized {
- // Throw exception if the context has already been started once
- // or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
@@ -472,8 +472,10 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
- * @param stopSparkContext Stop the associated SparkContext or not
*
+ * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
+ * will be stopped regardless of whether this StreamingContext has been
+ * started.
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
stop(stopSparkContext, false)
@@ -482,25 +484,27 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
- * @param stopSparkContext Stop the associated SparkContext or not
- * @param stopGracefully Stop gracefully by waiting for the processing of all
+ *
+ * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
+ * will be stopped regardless of whether this StreamingContext has been
+ * started.
+ * @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
- // Warn (but not fail) if context is stopped twice,
- // or context is stopped before starting
- if (state == Initialized) {
- logWarning("StreamingContext has not been started yet")
- return
+ state match {
+ case Initialized => logWarning("StreamingContext has not been started yet")
+ case Stopped => logWarning("StreamingContext has already been stopped")
+ case Started =>
+ scheduler.stop(stopGracefully)
+ logInfo("StreamingContext stopped successfully")
+ waiter.notifyStop()
}
- if (state == Stopped) {
- logWarning("StreamingContext has already been stopped")
- return
- } // no need to throw an exception as its okay to stop twice
- scheduler.stop(stopGracefully)
- logInfo("StreamingContext stopped successfully")
- waiter.notifyStop()
+ // Even if the streaming context has not been started, we still need to stop the SparkContext.
+ // Even if we have already stopped, we still need to attempt to stop the SparkContext because
+ // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
+ // The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f47772947d..4b49c4d251 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
after {
if (ssc != null) {
ssc.stop()
- if (ssc.sc != null) {
- // Calling ssc.stop() does not always stop the associated SparkContext.
- ssc.sc.stop()
- }
ssc = null
}
if (sc != null) {
@@ -137,11 +133,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}
- test("stop before start and start after stop") {
+ test("stop before start") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception
- ssc.start()
+ }
+
+ test("start after stop") {
+ // Regression test for SPARK-4301
+ ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register()
ssc.stop()
intercept[SparkException] {
ssc.start() // start after stop should throw exception
@@ -161,6 +162,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}
+ test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register()
+ ssc.stop(stopSparkContext = false)
+ assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
+ ssc.stop(stopSparkContext = true)
+ // Check that the SparkContext is actually stopped:
+ intercept[Exception] {
+ ssc.sc.makeRDD(1 to 100).collect()
+ }
+ }
+
test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("spark.cleaner.ttl", "3600")