aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorWilliam Benton <willb@redhat.com>2014-06-29 23:27:34 -0700
committerReynold Xin <rxin@apache.org>2014-06-29 23:27:34 -0700
commita484030dae9d0d7e4b97cc6307e9e928c07490dc (patch)
tree8077c51ced4a8a14f93a969798706ff39d948dd2 /streaming
parent66135a341d9f8baecc149d13ae5511f14578c395 (diff)
downloadspark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.gz
spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.bz2
spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.zip
SPARK-897: preemptively serialize closures
These commits cause `ClosureCleaner.clean` to attempt to serialize the cleaned closure with the default closure serializer and throw a `SparkException` if doing so fails. This behavior is enabled by default but can be disabled at individual callsites of `SparkContext.clean`. Commit 98e01ae8 fixes some no-op assertions in `GraphSuite` that this work exposed; I'm happy to put that in a separate PR if that would be more appropriate. Author: William Benton <willb@redhat.com> Closes #143 from willb/spark-897 and squashes the following commits: bceab8a [William Benton] Commented DStream corner cases for serializability checking. 64d04d2 [William Benton] FailureSuite now checks both messages and causes. 3b3f74a [William Benton] Stylistic and doc cleanups. b215dea [William Benton] Fixed spurious failures in ImplicitOrderingSuite be1ecd6 [William Benton] Don't check serializability of DStream transforms. abe816b [William Benton] Make proactive serializability checking optional. 5bfff24 [William Benton] Adds proactive closure-serializablilty checking ed2ccf0 [William Benton] Test cases for SPARK-897.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala25
1 files changed, 20 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 4709a62381..e05db236ad 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -532,7 +532,10 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
- new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
+ // it for serializability and so we pass the optional false to SparkContext.clean
+ new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
}
/**
@@ -540,7 +543,10 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
+ // it for serializability and so we pass the optional false to SparkContext.clean
+ transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
}
/**
@@ -548,7 +554,10 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- val cleanedF = context.sparkContext.clean(transformFunc)
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
+ // it for serializability and so we pass the optional false to SparkContext.clean
+ val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
@@ -563,7 +572,10 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
): DStream[V] = {
- val cleanedF = ssc.sparkContext.clean(transformFunc)
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
+ // it for serializability and so we pass the optional false to SparkContext.clean
+ val cleanedF = ssc.sparkContext.clean(transformFunc, false)
transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
}
@@ -574,7 +586,10 @@ abstract class DStream[T: ClassTag] (
def transformWith[U: ClassTag, V: ClassTag](
other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
): DStream[V] = {
- val cleanedF = ssc.sparkContext.clean(transformFunc)
+ // because the DStream is reachable from the outer object here, and because
+ // DStreams can't be serialized with closures, we can't proactively check
+ // it for serializability and so we pass the optional false to SparkContext.clean
+ val cleanedF = ssc.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 2)
val rdd1 = rdds(0).asInstanceOf[RDD[T]]