diff options
author | Lars Albertsson <lalle@spotify.com> | 2014-06-11 10:54:42 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-06-11 10:54:45 -0700 |
commit | 4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552 (patch) | |
tree | 39cd5ad656cf6f073f1d2f95b13a2f75a986863b /streaming | |
parent | e508f599f88baaa31a3498fb0bdbafdbc303119e (diff) | |
download | spark-4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552.tar.gz spark-4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552.tar.bz2 spark-4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552.zip |
SPARK-2113: awaitTermination() after stop() will hang in Spark Stremaing
Author: Lars Albertsson <lalle@spotify.com>
Closes #1001 from lallea/contextwaiter_stopped and squashes the following commits:
93cd314 [Lars Albertsson] Mend StreamingContext stop() followed by awaitTermination().
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala | 1 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 12 |
2 files changed, 13 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala index 86753360a0..a0aeacbc73 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala @@ -27,6 +27,7 @@ private[streaming] class ContextWaiter { } def notifyStop() = synchronized { + stopped = true notifyAll() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index cd86019f63..7b33d3b235 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -223,6 +223,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("awaitTermination after stop") { + ssc = new StreamingContext(master, appName, batchDuration) + val inputStream = addInputStream(ssc) + inputStream.map(x => x).register() + + failAfter(10000 millis) { + ssc.start() + ssc.stop() + ssc.awaitTermination() + } + } + test("awaitTermination with error in task") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) |