From 56c82edabd62db9e936bb9afcf300faf8ef39362 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Tue, 29 Nov 2016 23:45:06 -0800 Subject: [SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spark Streaming ## What changes were proposed in this pull request? #15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution. ## How was this patch tested? existing ut Author: uncleGen Closes #16052 from uncleGen/SPARK-18617. --- .../spark/streaming/StreamingContextSuite.scala | 47 ++++++++++++++++++++++ 1 file changed, 47 insertions(+) (limited to 'streaming') diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f1482e5c06..45d8f50853 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -806,6 +806,28 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo ssc.stop() } + test("SPARK-18560 Receiver data should be deserialized properly.") { + // Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the + // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560. + val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName) + ssc = new StreamingContext(conf, Milliseconds(100)) + val input = ssc.receiverStream(new FakeByteArrayReceiver) + input.count().foreachRDD { rdd => + // Make sure we can read from BlockRDD + if (rdd.collect().headOption.getOrElse(0L) > 0) { + // Stop StreamingContext to unblock "awaitTerminationOrTimeout" + new Thread() { + setDaemon(true) + override def run(): Unit = { + ssc.stop(stopSparkContext = true, stopGracefully = false) + } + }.start() + } + } + ssc.start() + ssc.awaitTerminationOrTimeout(60000) + } + def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) @@ -869,6 +891,31 @@ object TestReceiver { val counter = new AtomicInteger(1) } +class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging { + + val data: Array[Byte] = "test".getBytes + var receivingThreadOption: Option[Thread] = None + + override def onStart(): Unit = { + val thread = new Thread() { + override def run() { + logInfo("Receiving started") + while (!isStopped) { + store(data) + } + logInfo("Receiving stopped") + } + } + receivingThreadOption = Some(thread) + thread.start() + } + + override def onStop(): Unit = { + // no clean to be done, the receiving thread should stop on it own, so just wait for it. + receivingThreadOption.foreach(_.join()) + } +} + /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { -- cgit v1.2.3