aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala38
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala25
2 files changed, 40 insertions, 23 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 23d6d1c5e5..54b219711e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -436,10 +436,10 @@ class StreamingContext private[streaming] (
/**
* Start the execution of the streams.
+ *
+ * @throws SparkException if the context has already been started or stopped.
*/
def start(): Unit = synchronized {
- // Throw exception if the context has already been started once
- // or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
@@ -472,8 +472,10 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
- * @param stopSparkContext Stop the associated SparkContext or not
*
+ * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
+ * will be stopped regardless of whether this StreamingContext has been
+ * started.
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
stop(stopSparkContext, false)
@@ -482,25 +484,27 @@ class StreamingContext private[streaming] (
/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
- * @param stopSparkContext Stop the associated SparkContext or not
- * @param stopGracefully Stop gracefully by waiting for the processing of all
+ *
+ * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
+ * will be stopped regardless of whether this StreamingContext has been
+ * started.
+ * @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
- // Warn (but not fail) if context is stopped twice,
- // or context is stopped before starting
- if (state == Initialized) {
- logWarning("StreamingContext has not been started yet")
- return
+ state match {
+ case Initialized => logWarning("StreamingContext has not been started yet")
+ case Stopped => logWarning("StreamingContext has already been stopped")
+ case Started =>
+ scheduler.stop(stopGracefully)
+ logInfo("StreamingContext stopped successfully")
+ waiter.notifyStop()
}
- if (state == Stopped) {
- logWarning("StreamingContext has already been stopped")
- return
- } // no need to throw an exception as its okay to stop twice
- scheduler.stop(stopGracefully)
- logInfo("StreamingContext stopped successfully")
- waiter.notifyStop()
+ // Even if the streaming context has not been started, we still need to stop the SparkContext.
+ // Even if we have already stopped, we still need to attempt to stop the SparkContext because
+ // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
+ // The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f47772947d..4b49c4d251 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -46,10 +46,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
after {
if (ssc != null) {
ssc.stop()
- if (ssc.sc != null) {
- // Calling ssc.stop() does not always stop the associated SparkContext.
- ssc.sc.stop()
- }
ssc = null
}
if (sc != null) {
@@ -137,11 +133,16 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}
- test("stop before start and start after stop") {
+ test("stop before start") {
ssc = new StreamingContext(master, appName, batchDuration)
addInputStream(ssc).register()
ssc.stop() // stop before start should not throw exception
- ssc.start()
+ }
+
+ test("start after stop") {
+ // Regression test for SPARK-4301
+ ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register()
ssc.stop()
intercept[SparkException] {
ssc.start() // start after stop should throw exception
@@ -161,6 +162,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.stop()
}
+ test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register()
+ ssc.stop(stopSparkContext = false)
+ assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
+ ssc.stop(stopSparkContext = true)
+ // Check that the SparkContext is actually stopped:
+ intercept[Exception] {
+ ssc.sc.makeRDD(1 to 100).collect()
+ }
+ }
+
test("stop gracefully") {
val conf = new SparkConf().setMaster(master).setAppName(appName)
conf.set("spark.cleaner.ttl", "3600")