diff options
author | Burak Yavuz <brkyvz@gmail.com> | 2016-12-21 17:23:48 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-21 17:23:48 -0800 |
commit | afe36516e4b4031196ee2e0a04980ac49208ea6b (patch) | |
tree | 7eb53005f12f5855afb33f3a3eb8ad646a475909 /streaming/src/test/scala/org | |
parent | 7e8994ffd3d646adb0a769229637931e43cd12b0 (diff) | |
download | spark-afe36516e4b4031196ee2e0a04980ac49208ea6b.tar.gz spark-afe36516e4b4031196ee2e0a04980ac49208ea6b.tar.bz2 spark-afe36516e4b4031196ee2e0a04980ac49208ea6b.zip |
[FLAKY-TEST] InputStreamsSuite.socket input stream
## What changes were proposed in this pull request?
https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream
## How was this patch tested?
Tested 2,000 times.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes #16343 from brkyvz/sock.
Diffstat (limited to 'streaming/src/test/scala/org')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 55 |
1 files changed, 23 insertions, 32 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 9ecfa48091..6fb50a4052 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val expectedOutput = input.map(_.toString) for (i <- input.indices) { testServer.send(input(i).toString + "\n") - Thread.sleep(500) clock.advance(batchDuration.milliseconds) } - // Make sure we finish all batches before "stop" - if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) { - fail("Timeout: cannot finish all batches in 30 seconds") + + eventually(eventuallyTimeout) { + clock.advance(batchDuration.milliseconds) + // Verify whether data received was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputQueue.size) + logInfo("output") + outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Verify whether all the elements received are as expected + // (whether the elements were received one in each interval is not verified) + val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray + assert(output.length === expectedOutput.size) + for (i <- output.indices) { + assert(output(i) === expectedOutput(i)) + } } - // Ensure progress listener has been notified of all events - ssc.sparkContext.listenerBus.waitUntilEmpty(500) - - // Verify all "InputInfo"s have been reported - assert(ssc.progressListener.numTotalReceivedRecords === input.size) - assert(ssc.progressListener.numTotalProcessedRecords === input.size) - - logInfo("Stopping server") - testServer.stop() - logInfo("Stopping context") - ssc.stop() - - // Verify whether data received was as expected - logInfo("--------------------------------") - logInfo("output.size = " + outputQueue.size) - logInfo("output") - outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output.size = " + expectedOutput.size) - logInfo("expected output") - expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("--------------------------------") - - // Verify whether all the elements received are as expected - // (whether the elements were received one in each interval is not verified) - val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray - assert(output.length === expectedOutput.size) - for (i <- output.indices) { - assert(output(i) === expectedOutput(i)) + eventually(eventuallyTimeout) { + assert(ssc.progressListener.numTotalReceivedRecords === input.length) + assert(ssc.progressListener.numTotalProcessedRecords === input.length) } } } |