aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2016-11-29 23:45:06 -0800
committerReynold Xin <rxin@databricks.com>2016-11-29 23:45:06 -0800
commit56c82edabd62db9e936bb9afcf300faf8ef39362 (patch)
treeda7d0ed6bfb12a27ac402b35a5104733439a7146 /streaming/src
parent879ba71110b6c85a4e47133620fbae7580650a6f (diff)
downloadspark-56c82edabd62db9e936bb9afcf300faf8ef39362.tar.gz
spark-56c82edabd62db9e936bb9afcf300faf8ef39362.tar.bz2
spark-56c82edabd62db9e936bb9afcf300faf8ef39362.zip
[SPARK-18617][CORE][STREAMING] Close "kryo auto pick" feature for Spark Streaming
## What changes were proposed in this pull request? #15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution. ## How was this patch tested? existing ut Author: uncleGen <hustyugm@gmail.com> Closes #16052 from uncleGen/SPARK-18617.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala47
1 files changed, 47 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f1482e5c06..45d8f50853 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -806,6 +806,28 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
ssc.stop()
}
+ test("SPARK-18560 Receiver data should be deserialized properly.") {
+ // Start a two nodes cluster, so receiver will use one node, and Spark jobs will use the
+ // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560.
+ val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName)
+ ssc = new StreamingContext(conf, Milliseconds(100))
+ val input = ssc.receiverStream(new FakeByteArrayReceiver)
+ input.count().foreachRDD { rdd =>
+ // Make sure we can read from BlockRDD
+ if (rdd.collect().headOption.getOrElse(0L) > 0) {
+ // Stop StreamingContext to unblock "awaitTerminationOrTimeout"
+ new Thread() {
+ setDaemon(true)
+ override def run(): Unit = {
+ ssc.stop(stopSparkContext = true, stopGracefully = false)
+ }
+ }.start()
+ }
+ }
+ ssc.start()
+ ssc.awaitTerminationOrTimeout(60000)
+ }
+
def addInputStream(s: StreamingContext): DStream[Int] = {
val input = (1 to 100).map(i => 1 to i)
val inputStream = new TestInputStream(s, input, 1)
@@ -869,6 +891,31 @@ object TestReceiver {
val counter = new AtomicInteger(1)
}
+class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging {
+
+ val data: Array[Byte] = "test".getBytes
+ var receivingThreadOption: Option[Thread] = None
+
+ override def onStart(): Unit = {
+ val thread = new Thread() {
+ override def run() {
+ logInfo("Receiving started")
+ while (!isStopped) {
+ store(data)
+ }
+ logInfo("Receiving stopped")
+ }
+ }
+ receivingThreadOption = Some(thread)
+ thread.start()
+ }
+
+ override def onStop(): Unit = {
+ // no clean to be done, the receiving thread should stop on it own, so just wait for it.
+ receivingThreadOption.foreach(_.join())
+ }
+}
+
/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */
class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {