aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-04 20:09:15 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-04 20:09:15 -0700
commitd34bac0e156432ca6a260db73dbe1318060e309c (patch)
tree93f43c99824eb8784fca69d86a30ff2d7f840ba2 /streaming
parent2b67fdb60be95778e016efae4f0a9cdf2fbfe779 (diff)
downloadspark-d34bac0e156432ca6a260db73dbe1318060e309c.tar.gz
spark-d34bac0e156432ca6a260db73dbe1318060e309c.tar.bz2
spark-d34bac0e156432ca6a260db73dbe1318060e309c.zip
[SPARK-9504] [STREAMING] [TESTS] Fix o.a.s.streaming.StreamingContextSuite.stop gracefully again
The test failure is here: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3150/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/ There is a race condition in TestReceiver that it may add 1 record and increase `TestReceiver.counter` after stopping `BlockGenerator`. This PR just adds `join` to wait the pushing thread. Author: zsxwing <zsxwing@gmail.com> Closes #7934 from zsxwing/SPARK-9504-2 and squashes the following commits: cfd7973 [zsxwing] Wait for the thread to make sure we won't change TestReceiver.counter after stopping BlockGenerator
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala3
1 files changed, 2 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index b7db280f63..7423ef6bcb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -789,7 +789,8 @@ class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging
}
def onStop() {
- // no clean to be done, the receiving thread should stop on it own
+ // no clean to be done, the receiving thread should stop on it own, so just wait for it.
+ receivingThreadOption.foreach(_.join())
}
}