diff options
author | zsxwing <zsxwing@gmail.com> | 2015-08-04 20:09:15 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-04 20:09:15 -0700 |
commit | d34bac0e156432ca6a260db73dbe1318060e309c (patch) | |
tree | 93f43c99824eb8784fca69d86a30ff2d7f840ba2 /streaming | |
parent | 2b67fdb60be95778e016efae4f0a9cdf2fbfe779 (diff) | |
download | spark-d34bac0e156432ca6a260db73dbe1318060e309c.tar.gz spark-d34bac0e156432ca6a260db73dbe1318060e309c.tar.bz2 spark-d34bac0e156432ca6a260db73dbe1318060e309c.zip |
[SPARK-9504] [STREAMING] [TESTS] Fix o.a.s.streaming.StreamingContextSuite.stop gracefully again
The test failure is here: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/3150/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=centos/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/
There is a race condition in TestReceiver that it may add 1 record and increase `TestReceiver.counter` after stopping `BlockGenerator`. This PR just adds `join` to wait the pushing thread.
Author: zsxwing <zsxwing@gmail.com>
Closes #7934 from zsxwing/SPARK-9504-2 and squashes the following commits:
cfd7973 [zsxwing] Wait for the thread to make sure we won't change TestReceiver.counter after stopping BlockGenerator
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index b7db280f63..7423ef6bcb 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -789,7 +789,8 @@ class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging } def onStop() { - // no clean to be done, the receiving thread should stop on it own + // no clean to be done, the receiving thread should stop on it own, so just wait for it. + receivingThreadOption.foreach(_.join()) } } |