diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-01 14:22:49 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-01 14:22:49 -0800 |
commit | 086b0c8f6788b205bc630d5ccf078f77b9751af3 (patch) | |
tree | 4592a2f6de2d17d07adb45eead54d750e0071bb9 /streaming | |
parent | 78bb7f8071379114314c394e0167c4c5fd8545c5 (diff) | |
download | spark-086b0c8f6788b205bc630d5ccf078f77b9751af3.tar.gz spark-086b0c8f6788b205bc630d5ccf078f77b9751af3.tar.bz2 spark-086b0c8f6788b205bc630d5ccf078f77b9751af3.zip |
[SPARK-18617][SPARK-18560][TESTS] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly
## What changes were proposed in this pull request?
Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch added in #16091 can be passed too early.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16105 from zsxwing/SPARK-18617-2.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 35eeb9dfa5..5645996de5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc = new StreamingContext(conf, Milliseconds(100)) val input = ssc.receiverStream(new TestReceiver) val latch = new CountDownLatch(1) + @volatile var stopping = false input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD - if (rdd.collect().headOption.getOrElse(0L) > 0) { + if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) { // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + stopping = true new Thread() { setDaemon(true) override def run(): Unit = { |