diff options
author | zsxwing <zsxwing@gmail.com> | 2015-05-20 19:56:01 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-20 19:56:01 -0700 |
commit | 895baf8f77e630ce32b0e25b00bf5ee45d17398f (patch) | |
tree | f5e128b8b08409cb05950fb01a168db45f0f29d7 | |
parent | a70bf06b790add5f279a69607df89ed36155b0e4 (diff) | |
download | spark-895baf8f77e630ce32b0e25b00bf5ee45d17398f.tar.gz spark-895baf8f77e630ce32b0e25b00bf5ee45d17398f.tar.bz2 spark-895baf8f77e630ce32b0e25b00bf5ee45d17398f.zip |
[SPARK-7777] [STREAMING] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite
Just added a guard to make sure a batch has completed before moving to the next batch.
Author: zsxwing <zsxwing@gmail.com>
Closes #6306 from zsxwing/SPARK-7777 and squashes the following commits:
ecee529 [zsxwing] Fix the failure message
58634fe [zsxwing] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 7 |
1 files changed, 7 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 87bc20f79c..f269cb74e0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase { withTestServer(new TestServer()) { testServer => withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => testServer.start() + + val batchCounter = new BatchCounter(ssc) + // Set up the streaming context and input streams val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) @@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase { for (i <- 0 until input.size) { testServer.send(input(i).toString + "\n") Thread.sleep(200) + val numCompletedBatches = batchCounter.getNumCompletedBatches clock.advance(batchDuration.milliseconds) + if (!batchCounter.waitUntilBatchesCompleted(numCompletedBatches + 1, 5000)) { + fail("Batch took more than 5 seconds to complete") + } collectRddInfo() } |