From d04634701413410938a133358fe1d9fbc077645e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 31 Jul 2015 12:10:55 -0700 Subject: [SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a super overloaded Jenkins worker, the receiver may be not able to start in 500 milliseconds. Verified this in the log of https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There is no log about starting the receiver before this failure. That's why `assert(runningCount > 0)` failed. This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be more reliable. Author: zsxwing Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits: 7af66a6 [zsxwing] Remove wrong assertion 5ba2c99 [zsxwing] Use eventually to fix the flaky test --- .../org/apache/spark/streaming/StreamingContextSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'streaming') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 84a5fbb3d9..b7db280f63 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo for (i <- 1 to 4) { logInfo("==================================\n\n\n") ssc = new StreamingContext(sc, Milliseconds(100)) - var runningCount = 0 + @volatile var runningCount = 0 TestReceiver.counter.set(1) val input = ssc.receiverStream(new TestReceiver) input.count().foreachRDD { rdd => @@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo logInfo("Count = " + count + ", Running count = " + runningCount) } ssc.start() - ssc.awaitTerminationOrTimeout(500) + eventually(timeout(10.seconds), interval(10.millis)) { + assert(runningCount > 0) + } ssc.stop(stopSparkContext = false, stopGracefully = true) logInfo("Running count = " + runningCount) logInfo("TestReceiver.counter = " + TestReceiver.counter.get()) - assert(runningCount > 0) assert( - (TestReceiver.counter.get() == runningCount + 1) || - (TestReceiver.counter.get() == runningCount + 2), + TestReceiver.counter.get() == runningCount + 1, "Received records = " + TestReceiver.counter.get() + ", " + "processed records = " + runningCount ) -- cgit v1.2.3