aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorLars Albertsson <lalle@spotify.com>2014-06-11 10:54:42 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-11 10:54:45 -0700
commit4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552 (patch)
tree39cd5ad656cf6f073f1d2f95b13a2f75a986863b /streaming
parente508f599f88baaa31a3498fb0bdbafdbc303119e (diff)
downloadspark-4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552.tar.gz
spark-4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552.tar.bz2
spark-4d5c12aa1c54c49377a4bafe3bcc4993d5e1a552.zip
SPARK-2113: awaitTermination() after stop() will hang in Spark Stremaing
Author: Lars Albertsson <lalle@spotify.com> Closes #1001 from lallea/contextwaiter_stopped and squashes the following commits: 93cd314 [Lars Albertsson] Mend StreamingContext stop() followed by awaitTermination().
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala12
2 files changed, 13 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
index 86753360a0..a0aeacbc73 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala
@@ -27,6 +27,7 @@ private[streaming] class ContextWaiter {
}
def notifyStop() = synchronized {
+ stopped = true
notifyAll()
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index cd86019f63..7b33d3b235 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -223,6 +223,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
+ test("awaitTermination after stop") {
+ ssc = new StreamingContext(master, appName, batchDuration)
+ val inputStream = addInputStream(ssc)
+ inputStream.map(x => x).register()
+
+ failAfter(10000 millis) {
+ ssc.start()
+ ssc.stop()
+ ssc.awaitTermination()
+ }
+ }
+
test("awaitTermination with error in task") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)