diff options
author | Andrew Or <andrew@databricks.com> | 2015-05-05 09:37:49 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-05-05 09:37:49 -0700 |
commit | 57e9f29e17d97ed9d0f110fb2ce5a075b854a841 (patch) | |
tree | 277e057cac90343112f85a10e5667bae32490d5f /streaming | |
parent | 1fdabf8dcdb31391fec3952d312eb0ac59ece43b (diff) | |
download | spark-57e9f29e17d97ed9d0f110fb2ce5a075b854a841.tar.gz spark-57e9f29e17d97ed9d0f110fb2ce5a075b854a841.tar.bz2 spark-57e9f29e17d97ed9d0f110fb2ce5a075b854a841.zip |
[SPARK-7318] [STREAMING] DStream cleans objects that are not closures
I added a check in `ClosureCleaner#clean` to fail fast if this is detected in the future. tdas
Author: Andrew Or <andrew@databricks.com>
Closes #5860 from andrewor14/streaming-closure-cleaner and squashes the following commits:
8e971d7 [Andrew Or] Do not throw exception if object to clean is not closure
5ee4e25 [Andrew Or] Fix tests
eed3390 [Andrew Or] Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner
67eeff4 [Andrew Or] Add tests
a4fa768 [Andrew Or] Clean the closure, not the RDD
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 3 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala | 4 |
2 files changed, 4 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 83d41f5762..f1f8a70655 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] ( // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) + val cleanedF = context.sparkContext.clean(transformFunc, false) + transform((r: RDD[T], t: Time) => cleanedF(r)) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 393a360cfe..5d7127627e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { } withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc => - val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) - val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) + val receiver1 = new FakeReceiver(sendData = true) + val receiver2 = new FakeReceiver(sendData = true) val receiverStream1 = ssc.receiverStream(receiver1) val receiverStream2 = ssc.receiverStream(receiver2) receiverStream1.register() |