diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 4709a62381..e05db236ad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -532,7 +532,10 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } /** @@ -540,7 +543,10 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) } /** @@ -548,7 +554,10 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - val cleanedF = context.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) cleanedF(rdds.head.asInstanceOf[RDD[T]], time) @@ -563,7 +572,10 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = ssc.sparkContext.clean(transformFunc, false) transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) } @@ -574,7 +586,10 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = ssc.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 2) val rdd1 = rdds(0).asInstanceOf[RDD[T]] |