aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 35eeb9dfa5..5645996de5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
ssc = new StreamingContext(conf, Milliseconds(100))
val input = ssc.receiverStream(new TestReceiver)
val latch = new CountDownLatch(1)
+ @volatile var stopping = false
input.count().foreachRDD { rdd =>
// Make sure we can read from BlockRDD
- if (rdd.collect().headOption.getOrElse(0L) > 0) {
+ if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) {
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+ stopping = true
new Thread() {
setDaemon(true)
override def run(): Unit = {