aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-20 19:56:01 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-20 19:56:01 -0700
commit895baf8f77e630ce32b0e25b00bf5ee45d17398f (patch)
treef5e128b8b08409cb05950fb01a168db45f0f29d7 /streaming
parenta70bf06b790add5f279a69607df89ed36155b0e4 (diff)
downloadspark-895baf8f77e630ce32b0e25b00bf5ee45d17398f.tar.gz
spark-895baf8f77e630ce32b0e25b00bf5ee45d17398f.tar.bz2
spark-895baf8f77e630ce32b0e25b00bf5ee45d17398f.zip
[SPARK-7777] [STREAMING] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite
Just added a guard to make sure a batch has completed before moving to the next batch. Author: zsxwing <zsxwing@gmail.com> Closes #6306 from zsxwing/SPARK-7777 and squashes the following commits: ecee529 [zsxwing] Fix the failure message 58634fe [zsxwing] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala7
1 files changed, 7 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 87bc20f79c..f269cb74e0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase {
withTestServer(new TestServer()) { testServer =>
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
testServer.start()
+
+ val batchCounter = new BatchCounter(ssc)
+
// Set up the streaming context and input streams
val networkStream =
ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
@@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase {
for (i <- 0 until input.size) {
testServer.send(input(i).toString + "\n")
Thread.sleep(200)
+ val numCompletedBatches = batchCounter.getNumCompletedBatches
clock.advance(batchDuration.milliseconds)
+ if (!batchCounter.waitUntilBatchesCompleted(numCompletedBatches + 1, 5000)) {
+ fail("Batch took more than 5 seconds to complete")
+ }
collectRddInfo()
}