diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-04-10 02:10:40 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-10 02:10:40 -0700 |
commit | e6d4a74d2d92345985c1603f9b526a6347adb7cf (patch) | |
tree | e22221efdcf5eb5eac305ef12d2604d0198d697c /streaming | |
parent | e55cc4bae52a3de728939244780abc662713b768 (diff) | |
download | spark-e6d4a74d2d92345985c1603f9b526a6347adb7cf.tar.gz spark-e6d4a74d2d92345985c1603f9b526a6347adb7cf.tar.bz2 spark-e6d4a74d2d92345985c1603f9b526a6347adb7cf.zip |
Revert "SPARK-729: Closures not always serialized at capture time"
This reverts commit 8ca3b2bc90a63b23a03f339e390174cd7a672b40.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 4759b629a9..d043200f71 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -539,7 +539,7 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) } /** @@ -547,7 +547,7 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - val cleanedF = context.sparkContext.clean(transformFunc, false) + val cleanedF = context.sparkContext.clean(transformFunc) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) cleanedF(rdds.head.asInstanceOf[RDD[T]], time) @@ -562,7 +562,7 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc, false) + val cleanedF = ssc.sparkContext.clean(transformFunc) transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) } @@ -573,7 +573,7 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc, false) + val cleanedF = ssc.sparkContext.clean(transformFunc) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 2) val rdd1 = rdds(0).asInstanceOf[RDD[T]] |