aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala10
1 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 84a5fbb3d9..b7db280f63 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
for (i <- 1 to 4) {
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))
- var runningCount = 0
+ @volatile var runningCount = 0
TestReceiver.counter.set(1)
val input = ssc.receiverStream(new TestReceiver)
input.count().foreachRDD { rdd =>
@@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
logInfo("Count = " + count + ", Running count = " + runningCount)
}
ssc.start()
- ssc.awaitTerminationOrTimeout(500)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ assert(runningCount > 0)
+ }
ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount)
logInfo("TestReceiver.counter = " + TestReceiver.counter.get())
- assert(runningCount > 0)
assert(
- (TestReceiver.counter.get() == runningCount + 1) ||
- (TestReceiver.counter.get() == runningCount + 2),
+ TestReceiver.counter.get() == runningCount + 1,
"Received records = " + TestReceiver.counter.get() + ", " +
"processed records = " + runningCount
)