aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-04-10 02:10:40 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-10 02:10:40 -0700
commite6d4a74d2d92345985c1603f9b526a6347adb7cf (patch)
treee22221efdcf5eb5eac305ef12d2604d0198d697c /streaming
parente55cc4bae52a3de728939244780abc662713b768 (diff)
downloadspark-e6d4a74d2d92345985c1603f9b526a6347adb7cf.tar.gz
spark-e6d4a74d2d92345985c1603f9b526a6347adb7cf.tar.bz2
spark-e6d4a74d2d92345985c1603f9b526a6347adb7cf.zip
Revert "SPARK-729: Closures not always serialized at capture time"
This reverts commit 8ca3b2bc90a63b23a03f339e390174cd7a672b40.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 4759b629a9..d043200f71 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -539,7 +539,7 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
+ transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
}
/**
@@ -547,7 +547,7 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- val cleanedF = context.sparkContext.clean(transformFunc, false)
+ val cleanedF = context.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
@@ -562,7 +562,7 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V] = {
- val cleanedF = ssc.sparkContext.clean(transformFunc, false)
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
}
@@ -573,7 +573,7 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] = {
- val cleanedF = ssc.sparkContext.clean(transformFunc, false)
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 2)
val rdd1 = rdds(0).asInstanceOf[RDD[T]]