diff options
author | William Benton <willb@redhat.com> | 2014-06-29 23:27:34 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-29 23:27:34 -0700 |
commit | a484030dae9d0d7e4b97cc6307e9e928c07490dc (patch) | |
tree | 8077c51ced4a8a14f93a969798706ff39d948dd2 /streaming/src | |
parent | 66135a341d9f8baecc149d13ae5511f14578c395 (diff) | |
download | spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.gz spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.bz2 spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.zip |
SPARK-897: preemptively serialize closures
These commits cause `ClosureCleaner.clean` to attempt to serialize the cleaned closure with the default closure serializer and throw a `SparkException` if doing so fails. This behavior is enabled by default but can be disabled at individual callsites of `SparkContext.clean`.
Commit 98e01ae8 fixes some no-op assertions in `GraphSuite` that this work exposed; I'm happy to put that in a separate PR if that would be more appropriate.
Author: William Benton <willb@redhat.com>
Closes #143 from willb/spark-897 and squashes the following commits:
bceab8a [William Benton] Commented DStream corner cases for serializability checking.
64d04d2 [William Benton] FailureSuite now checks both messages and causes.
3b3f74a [William Benton] Stylistic and doc cleanups.
b215dea [William Benton] Fixed spurious failures in ImplicitOrderingSuite
be1ecd6 [William Benton] Don't check serializability of DStream transforms.
abe816b [William Benton] Make proactive serializability checking optional.
5bfff24 [William Benton] Adds proactive closure-serializablilty checking
ed2ccf0 [William Benton] Test cases for SPARK-897.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 4709a62381..e05db236ad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -532,7 +532,10 @@ abstract class DStream[T: ClassTag] ( * 'this' DStream will be registered as an output stream and therefore materialized. */ def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { - new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register() } /** @@ -540,7 +543,10 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false)) } /** @@ -548,7 +554,10 @@ abstract class DStream[T: ClassTag] ( * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - val cleanedF = context.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) cleanedF(rdds.head.asInstanceOf[RDD[T]], time) @@ -563,7 +572,10 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = ssc.sparkContext.clean(transformFunc, false) transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) } @@ -574,7 +586,10 @@ abstract class DStream[T: ClassTag] ( def transformWith[U: ClassTag, V: ClassTag]( other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) + // because the DStream is reachable from the outer object here, and because + // DStreams can't be serialized with closures, we can't proactively check + // it for serializability and so we pass the optional false to SparkContext.clean + val cleanedF = ssc.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 2) val rdd1 = rdds(0).asInstanceOf[RDD[T]] |