aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-11-30 17:41:43 -0800
committerReynold Xin <rxin@databricks.com>2016-11-30 17:41:43 -0800
commit0a811210f809eb5b80eae14694d484d45b48b3f6 (patch)
treee3da60ad3ced39af2ad46d6a4c5d8439de6be7b7 /streaming/src
parentc4979f6ea8ed44fd87ded3133efa6df39d4842c3 (diff)
downloadspark-0a811210f809eb5b80eae14694d484d45b48b3f6.tar.gz
spark-0a811210f809eb5b80eae14694d484d45b48b3f6.tar.bz2
spark-0a811210f809eb5b80eae14694d484d45b48b3f6.zip
[SPARK-18617][SPARK-18560][TEST] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly
## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16091 from zsxwing/SPARK-18617-follow-up.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala34
1 files changed, 8 insertions, 26 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 45d8f50853..35eeb9dfa5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{File, NotSerializableException}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
@@ -811,7 +812,8 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
// other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
ssc = new StreamingContext(conf, Milliseconds(100))
- val input = ssc.receiverStream(new FakeByteArrayReceiver)
+ val input = ssc.receiverStream(new TestReceiver)
+ val latch = new CountDownLatch(1)
input.count().foreachRDD { rdd =>
// Make sure we can read from BlockRDD
if (rdd.collect().headOption.getOrElse(0L) > 0) {
@@ -820,12 +822,17 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
setDaemon(true)
override def run(): Unit = {
ssc.stop(stopSparkContext = true, stopGracefully = false)
+ latch.countDown()
}
}.start()
}
}
ssc.start()
ssc.awaitTerminationOrTimeout(60000)
+ // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active
+ // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test
+ // is running.
+ assert(latch.await(60, TimeUnit.SECONDS))
}
def addInputStream(s: StreamingContext): DStream[Int] = {
@@ -891,31 +898,6 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}
-class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {
-
- val data: Array[Byte] = "test".getBytes
- var receivingThreadOption: Option[Thread] = None
-
- override def onStart(): Unit = {
- val thread = new Thread() {
- override def run() {
- logInfo("Receiving started")
- while (!isStopped) {
- store(data)
- }
- logInfo("Receiving stopped")
- }
- }
- receivingThreadOption = Some(thread)
- thread.start()
- }
-
- override def onStop(): Unit = {
- // no clean to be done, the receiving thread should stop on it own, so just wait for it.
- receivingThreadOption.foreach(_.join())
- }
-}
-
/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {