aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala4
2 files changed, 4 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 83d41f5762..f1f8a70655 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] (
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
- transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
+ val cleanedF = context.sparkContext.clean(transformFunc, false)
+ transform((r: RDD[T], t: Time) => cleanedF(r))
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 393a360cfe..5d7127627e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
}
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
- val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
- val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
+ val receiver1 = new FakeReceiver(sendData = true)
+ val receiver2 = new FakeReceiver(sendData = true)
val receiverStream1 = ssc.receiverStream(receiver1)
val receiverStream2 = ssc.receiverStream(receiver2)
receiverStream1.register()