aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-05-05 09:37:49 -0700
committerAndrew Or <andrew@databricks.com>2015-05-05 09:37:49 -0700
commit57e9f29e17d97ed9d0f110fb2ce5a075b854a841 (patch)
tree277e057cac90343112f85a10e5667bae32490d5f /streaming/src/main
parent1fdabf8dcdb31391fec3952d312eb0ac59ece43b (diff)
downloadspark-57e9f29e17d97ed9d0f110fb2ce5a075b854a841.tar.gz
spark-57e9f29e17d97ed9d0f110fb2ce5a075b854a841.tar.bz2
spark-57e9f29e17d97ed9d0f110fb2ce5a075b854a841.zip
[SPARK-7318] [STREAMING] DStream cleans objects that are not closures
I added a check in `ClosureCleaner#clean` to fail fast if this is detected in the future. tdas Author: Andrew Or <andrew@databricks.com> Closes #5860 from andrewor14/streaming-closure-cleaner and squashes the following commits: 8e971d7 [Andrew Or] Do not throw exception if object to clean is not closure 5ee4e25 [Andrew Or] Fix tests eed3390 [Andrew Or] Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner 67eeff4 [Andrew Or] Add tests a4fa768 [Andrew Or] Clean the closure, not the RDD
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala3
1 files changed, 2 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 83d41f5762..f1f8a70655 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] (
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
- transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
+ val cleanedF = context.sparkContext.clean(transformFunc, false)
+ transform((r: RDD[T], t: Time) => cleanedF(r))
}
/**