aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-31 12:10:55 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-31 12:10:55 -0700
commitd04634701413410938a133358fe1d9fbc077645e (patch)
tree476d642665d6cb74613d6dc0c29033dfbefa178f /streaming
parent3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 (diff)
downloadspark-d04634701413410938a133358fe1d9fbc077645e.tar.gz
spark-d04634701413410938a133358fe1d9fbc077645e.tar.bz2
spark-d04634701413410938a133358fe1d9fbc077645e.zip
[SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test
The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a super overloaded Jenkins worker, the receiver may be not able to start in 500 milliseconds. Verified this in the log of https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There is no log about starting the receiver before this failure. That's why `assert(runningCount > 0)` failed. This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be more reliable. Author: zsxwing <zsxwing@gmail.com> Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits: 7af66a6 [zsxwing] Remove wrong assertion 5ba2c99 [zsxwing] Use eventually to fix the flaky test
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala10
1 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 84a5fbb3d9..b7db280f63 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
for (i <- 1 to 4) {
logInfo("==================================\n\n\n")
ssc = new StreamingContext(sc, Milliseconds(100))
- var runningCount = 0
+ @volatile var runningCount = 0
TestReceiver.counter.set(1)
val input = ssc.receiverStream(new TestReceiver)
input.count().foreachRDD { rdd =>
@@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
logInfo("Count = " + count + ", Running count = " + runningCount)
}
ssc.start()
- ssc.awaitTerminationOrTimeout(500)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ assert(runningCount > 0)
+ }
ssc.stop(stopSparkContext = false, stopGracefully = true)
logInfo("Running count = " + runningCount)
logInfo("TestReceiver.counter = " + TestReceiver.counter.get())
- assert(runningCount > 0)
assert(
- (TestReceiver.counter.get() == runningCount + 1) ||
- (TestReceiver.counter.get() == runningCount + 2),
+ TestReceiver.counter.get() == runningCount + 1,
"Received records = " + TestReceiver.counter.get() + ", " +
"processed records = " + runningCount
)