aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-01 14:22:49 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-01 14:22:49 -0800
commit086b0c8f6788b205bc630d5ccf078f77b9751af3 (patch)
tree4592a2f6de2d17d07adb45eead54d750e0071bb9 /streaming
parent78bb7f8071379114314c394e0167c4c5fd8545c5 (diff)
downloadspark-086b0c8f6788b205bc630d5ccf078f77b9751af3.tar.gz
spark-086b0c8f6788b205bc630d5ccf078f77b9751af3.tar.bz2
spark-086b0c8f6788b205bc630d5ccf078f77b9751af3.zip
[SPARK-18617][SPARK-18560][TESTS] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly
## What changes were proposed in this pull request? Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch added in #16091 can be passed too early. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16105 from zsxwing/SPARK-18617-2.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 35eeb9dfa5..5645996de5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
ssc = new StreamingContext(conf, Milliseconds(100))
val input = ssc.receiverStream(new TestReceiver)
val latch = new CountDownLatch(1)
+ @volatile var stopping = false
input.count().foreachRDD { rdd =>
// Make sure we can read from BlockRDD
- if (rdd.collect().headOption.getOrElse(0L) > 0) {
+ if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) {
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+ stopping = true
new Thread() {
setDaemon(true)
override def run(): Unit = {