aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorWilliam Benton <willb@redhat.com>2014-06-29 23:27:34 -0700
committerReynold Xin <rxin@apache.org>2014-06-29 23:27:34 -0700
commita484030dae9d0d7e4b97cc6307e9e928c07490dc (patch)
tree8077c51ced4a8a14f93a969798706ff39d948dd2 /core/src/main/scala
parent66135a341d9f8baecc149d13ae5511f14578c395 (diff)
downloadspark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.gz
spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.tar.bz2
spark-a484030dae9d0d7e4b97cc6307e9e928c07490dc.zip
SPARK-897: preemptively serialize closures
These commits cause `ClosureCleaner.clean` to attempt to serialize the cleaned closure with the default closure serializer and throw a `SparkException` if doing so fails. This behavior is enabled by default but can be disabled at individual callsites of `SparkContext.clean`. Commit 98e01ae8 fixes some no-op assertions in `GraphSuite` that this work exposed; I'm happy to put that in a separate PR if that would be more appropriate. Author: William Benton <willb@redhat.com> Closes #143 from willb/spark-897 and squashes the following commits: bceab8a [William Benton] Commented DStream corner cases for serializability checking. 64d04d2 [William Benton] FailureSuite now checks both messages and causes. 3b3f74a [William Benton] Stylistic and doc cleanups. b215dea [William Benton] Fixed spurious failures in ImplicitOrderingSuite be1ecd6 [William Benton] Don't check serializability of DStream transforms. abe816b [William Benton] Make proactive serializability checking optional. 5bfff24 [William Benton] Adds proactive closure-serializablilty checking ed2ccf0 [William Benton] Test cases for SPARK-897.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala16
2 files changed, 24 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f9476ff826..8819e73d17 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1203,9 +1203,17 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
+ * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
+ * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
+ * if not.
+ *
+ * @param f the closure to clean
+ * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
+ * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
+ * serializable
*/
- private[spark] def clean[F <: AnyRef](f: F): F = {
- ClosureCleaner.clean(f)
+ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+ ClosureCleaner.clean(f, checkSerializable)
f
}
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 4916d9b86c..e3f52f6ff1 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.Set
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
private[spark] object ClosureCleaner extends Logging {
// Get an ASM class reader for a given class from the JAR that loaded it
@@ -101,7 +101,7 @@ private[spark] object ClosureCleaner extends Logging {
}
}
- def clean(func: AnyRef) {
+ def clean(func: AnyRef, checkSerializable: Boolean = true) {
// TODO: cache outerClasses / innerClasses / accessedFields
val outerClasses = getOuterClasses(func)
val innerClasses = getInnerClasses(func)
@@ -153,6 +153,18 @@ private[spark] object ClosureCleaner extends Logging {
field.setAccessible(true)
field.set(func, outer)
}
+
+ if (checkSerializable) {
+ ensureSerializable(func)
+ }
+ }
+
+ private def ensureSerializable(func: AnyRef) {
+ try {
+ SparkEnv.get.closureSerializer.newInstance().serialize(func)
+ } catch {
+ case ex: Exception => throw new SparkException("Task not serializable", ex)
+ }
}
private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {