aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-07-23 20:25:58 -0700
committerReynold Xin <reynoldx@gmail.com>2013-07-23 20:25:58 -0700
commit85ab8114bc1367a0f4f32d4b8635c41aa547bc72 (patch)
tree91fd7bf525d327a7f075b1b0b68c5364c5b0fd21 /core
parentf2422d4f29abb80b9bc76c4596d1cc31d9e6d590 (diff)
downloadspark-85ab8114bc1367a0f4f32d4b8635c41aa547bc72.tar.gz
spark-85ab8114bc1367a0f4f32d4b8635c41aa547bc72.tar.bz2
spark-85ab8114bc1367a0f4f32d4b8635c41aa547bc72.zip
Moved non-serializable closure catching exception from submitStage to submitMissingTasks
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala25
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala10
2 files changed, 23 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 5fcd807aff..fde998494f 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -465,18 +465,6 @@ class DAGScheduler(
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
logDebug("submitStage(" + stage + ")")
-
- // Preemptively serialize the stage RDD to make sure the tasks for this stage will be
- // serializable. We are catching this exception here because it would be fairly hard to
- // catch the non-serializable exception down the road, where we have several different
- // implementations for local scheduler and cluster schedulers.
- try {
- SparkEnv.get.closureSerializer.newInstance().serialize(stage.rdd)
- } catch {
- case e: NotSerializableException => abortStage(stage, e.toString)
- return
- }
-
if (!waiting(stage) && !running(stage) && !failed(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
@@ -515,6 +503,19 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
+ // Preemptively serialize a task to make sure it can be serialized. We are catching this
+ // exception here because it would be fairly hard to catch the non-serializable exception
+ // down the road, where we have several different implementations for local scheduler and
+ // cluster schedulers.
+ try {
+ SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
+ } catch {
+ case e: NotSerializableException =>
+ abortStage(stage, e.toString)
+ running -= stage
+ return
+ }
+
sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index c3c52f9118..5b133cdd6e 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -96,18 +96,28 @@ class FailureSuite extends FunSuite with LocalSparkContext {
test("failure because task closure is not serializable") {
sc = new SparkContext("local[1,1]", "test")
val a = new NonSerializable
+
+ // Non-serializable closure in the final result stage
val thrown = intercept[SparkException] {
sc.parallelize(1 to 10, 2).map(x => a).count()
}
assert(thrown.getClass === classOf[SparkException])
assert(thrown.getMessage.contains("NotSerializableException"))
+ // Non-serializable closure in an earlier stage
val thrown1 = intercept[SparkException] {
sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
}
assert(thrown1.getClass === classOf[SparkException])
assert(thrown1.getMessage.contains("NotSerializableException"))
+ // Non-serializable closure in foreach function
+ val thrown2 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).foreach(x => println(a))
+ }
+ assert(thrown2.getClass === classOf[SparkException])
+ assert(thrown2.getMessage.contains("NotSerializableException"))
+
FailureSuiteState.clear()
}