aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-12-31 16:02:47 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-31 16:02:47 -0800
commitfe6efacc0b865e9e827a1565877077000e63976e (patch)
tree310399ecc0182cb2d2842f3dacf831e2a929e424 /streaming
parentc4f0b4f334f7f3565375921fcac184ad5b1fb207 (diff)
downloadspark-fe6efacc0b865e9e827a1565877077000e63976e.tar.gz
spark-fe6efacc0b865e9e827a1565877077000e63976e.tar.bz2
spark-fe6efacc0b865e9e827a1565877077000e63976e.zip
[SPARK-5035] [Streaming] ReceiverMessage trait should extend Serializable
Spark Streaming's ReceiverMessage trait should extend Serializable in order to fix a subtle bug that only occurs when running on a real cluster: If you attempt to send a fire-and-forget message to a remote Akka actor and that message cannot be serialized, then this seems to lead to more-or-less silent failures. As an optimization, Akka skips message serialization for messages sent within the same JVM. As a result, Spark's unit tests will never fail due to non-serializable Akka messages, but these will cause mostly-silent failures when running on a real cluster. Before this patch, here was the code for ReceiverMessage: ``` /** Messages sent to the NetworkReceiver. */ private[streaming] sealed trait ReceiverMessage private[streaming] object StopReceiver extends ReceiverMessage ``` Since ReceiverMessage does not extend Serializable and StopReceiver is a regular `object`, not a `case object`, StopReceiver will throw serialization errors. As a result, graceful receiver shutdown is broken on real clusters (and local-cluster mode) but works in local modes. If you want to reproduce this, try running the word count example from the Streaming Programming Guide in the Spark shell: ``` import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ val ssc = new StreamingContext(sc, Seconds(10)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() Thread.sleep(10000) ssc.stop(true, true) ``` Prior to this patch, this would work correctly in local mode but fail when running against a real cluster (it would report that some receivers were not shut down). Author: Josh Rosen <joshrosen@databricks.com> Closes #3857 from JoshRosen/SPARK-5035 and squashes the following commits: 71d0eae [Josh Rosen] [SPARK-5035] ReceiverMessage trait should extend Serializable.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
index bf39d1e891..ab9fa19219 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverMessage.scala
@@ -18,6 +18,6 @@
package org.apache.spark.streaming.receiver
/** Messages sent to the NetworkReceiver. */
-private[streaming] sealed trait ReceiverMessage
+private[streaming] sealed trait ReceiverMessage extends Serializable
private[streaming] object StopReceiver extends ReceiverMessage