aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorrobbins <robbins@uk.ibm.com>2015-09-03 13:48:35 -0700
committerAndrew Or <andrew@databricks.com>2015-09-03 13:48:35 -0700
commit754f853b02e9fd221f138c2446445fd56e3f3fb3 (patch)
tree662c6d38c6384aa8d1303911bd6dfdcce7562c64 /streaming
parentd911c682f00cd5c438568c548098e03d3e7ea05c (diff)
downloadspark-754f853b02e9fd221f138c2446445fd56e3f3fb3.tar.gz
spark-754f853b02e9fd221f138c2446445fd56e3f3fb3.tar.bz2
spark-754f853b02e9fd221f138c2446445fd56e3f3fb3.zip
[SPARK-9869] [STREAMING] Wait for all event notifications before asserting results
Author: robbins <robbins@uk.ibm.com> Closes #8589 from robbinspg/InputStreamSuite-fix.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala3
1 files changed, 3 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index ec2852d9a0..047e38ef90 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -76,6 +76,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
fail("Timeout: cannot finish all batches in 30 seconds")
}
+ // Ensure progress listener has been notified of all events
+ ssc.scheduler.listenerBus.waitUntilEmpty(500)
+
// Verify all "InputInfo"s have been reported
assert(ssc.progressListener.numTotalReceivedRecords === input.size)
assert(ssc.progressListener.numTotalProcessedRecords === input.size)