From 086b0c8f6788b205bc630d5ccf078f77b9751af3 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Dec 2016 14:22:49 -0800 Subject: [SPARK-18617][SPARK-18560][TESTS] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch added in #16091 can be passed too early. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16105 from zsxwing/SPARK-18617-2. --- .../test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 35eeb9dfa5..5645996de5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc = new StreamingContext(conf, Milliseconds(100)) val input = ssc.receiverStream(new TestReceiver) val latch = new CountDownLatch(1) + @volatile var stopping = false input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD - if (rdd.collect().headOption.getOrElse(0L) > 0) { + if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) { // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + stopping = true new Thread() { setDaemon(true) override def run(): Unit = { -- cgit v1.2.3