diff options
author | William Benton <willb@redhat.com> | 2014-06-29 23:27:34 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-29 23:27:34 -0700 |
commit | a484030dae9d0d7e4b97cc6307e9e928c07490dc (patch) | |
tree | 8077c51ced4a8a14f93a969798706ff39d948dd2 /core/src/main/scala | |
parent | 66135a341d9f8baecc149d13ae5511f14578c395 (diff) | |
download | spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.gz spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.bz2 spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.zip |
SPARK-897: preemptively serialize closures
These commits cause `ClosureCleaner.clean` to attempt to serialize the cleaned closure with the default closure serializer and throw a `SparkException` if doing so fails. This behavior is enabled by default but can be disabled at individual callsites of `SparkContext.clean`.
Commit 98e01ae8 fixes some no-op assertions in `GraphSuite` that this work exposed; I'm happy to put that in a separate PR if that would be more appropriate.
Author: William Benton <willb@redhat.com>
Closes #143 from willb/spark-897 and squashes the following commits:
bceab8a [William Benton] Commented DStream corner cases for serializability checking.
64d04d2 [William Benton] FailureSuite now checks both messages and causes.
3b3f74a [William Benton] Stylistic and doc cleanups.
b215dea [William Benton] Fixed spurious failures in ImplicitOrderingSuite
be1ecd6 [William Benton] Don't check serializability of DStream transforms.
abe816b [William Benton] Make proactive serializability checking optional.
5bfff24 [William Benton] Adds proactive closure-serializablilty checking
ed2ccf0 [William Benton] Test cases for SPARK-897.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala | 16 |
2 files changed, 24 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f9476ff826..8819e73d17 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1203,9 +1203,17 @@ class SparkContext(config: SparkConf) extends Logging { /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) + * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively + * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt> + * if not. + * + * @param f the closure to clean + * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability + * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not + * serializable */ - private[spark] def clean[F <: AnyRef](f: F): F = { - ClosureCleaner.clean(f) + private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) f } diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 4916d9b86c..e3f52f6ff1 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.Set import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ -import org.apache.spark.{Logging, SparkException} +import org.apache.spark.{Logging, SparkEnv, SparkException} private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it @@ -101,7 +101,7 @@ private[spark] object ClosureCleaner extends Logging { } } - def clean(func: AnyRef) { + def clean(func: AnyRef, checkSerializable: Boolean = true) { // TODO: cache outerClasses / innerClasses / accessedFields val outerClasses = getOuterClasses(func) val innerClasses = getInnerClasses(func) @@ -153,6 +153,18 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(func, outer) } + + if (checkSerializable) { + ensureSerializable(func) + } + } + + private def ensureSerializable(func: AnyRef) { + try { + SparkEnv.get.closureSerializer.newInstance().serialize(func) + } catch { + case ex: Exception => throw new SparkException("Task not serializable", ex) + } } private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = { |