aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-12-21 17:23:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-21 17:23:48 -0800
commitafe36516e4b4031196ee2e0a04980ac49208ea6b (patch)
tree7eb53005f12f5855afb33f3a3eb8ad646a475909 /streaming
parent7e8994ffd3d646adb0a769229637931e43cd12b0 (diff)
downloadspark-afe36516e4b4031196ee2e0a04980ac49208ea6b.tar.gz
spark-afe36516e4b4031196ee2e0a04980ac49208ea6b.tar.bz2
spark-afe36516e4b4031196ee2e0a04980ac49208ea6b.zip
[FLAKY-TEST] InputStreamsSuite.socket input stream
## What changes were proposed in this pull request? https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.streaming.InputStreamsSuite&test_name=socket+input+stream ## How was this patch tested? Tested 2,000 times. Author: Burak Yavuz <brkyvz@gmail.com> Closes #16343 from brkyvz/sock.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala55
1 files changed, 23 insertions, 32 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 9ecfa48091..6fb50a4052 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -67,42 +67,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val expectedOutput = input.map(_.toString)
for (i <- input.indices) {
testServer.send(input(i).toString + "\n")
- Thread.sleep(500)
clock.advance(batchDuration.milliseconds)
}
- // Make sure we finish all batches before "stop"
- if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
- fail("Timeout: cannot finish all batches in 30 seconds")
+
+ eventually(eventuallyTimeout) {
+ clock.advance(batchDuration.milliseconds)
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputQueue.size)
+ logInfo("output")
+ outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ // (whether the elements were received one in each interval is not verified)
+ val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
+ assert(output.length === expectedOutput.size)
+ for (i <- output.indices) {
+ assert(output(i) === expectedOutput(i))
+ }
}
- // Ensure progress listener has been notified of all events
- ssc.sparkContext.listenerBus.waitUntilEmpty(500)
-
- // Verify all "InputInfo"s have been reported
- assert(ssc.progressListener.numTotalReceivedRecords === input.size)
- assert(ssc.progressListener.numTotalProcessedRecords === input.size)
-
- logInfo("Stopping server")
- testServer.stop()
- logInfo("Stopping context")
- ssc.stop()
-
- // Verify whether data received was as expected
- logInfo("--------------------------------")
- logInfo("output.size = " + outputQueue.size)
- logInfo("output")
- outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
- expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- // (whether the elements were received one in each interval is not verified)
- val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
- assert(output.length === expectedOutput.size)
- for (i <- output.indices) {
- assert(output(i) === expectedOutput(i))
+ eventually(eventuallyTimeout) {
+ assert(ssc.progressListener.numTotalReceivedRecords === input.length)
+ assert(ssc.progressListener.numTotalProcessedRecords === input.length)
}
}
}