aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-11-14 19:44:50 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-11-14 19:44:50 -0800
commitdfd40e9f6f87ff1f205944997cdbbb6bb7f0312c (patch)
tree7d17052b8cd67fecfa2cff6ebeca4fc24831c289 /core
parented25105fd9733acd631dab0993560ac66ffeae16 (diff)
parent29c88e408ecc3416104530756561fee482393913 (diff)
downloadspark-dfd40e9f6f87ff1f205944997cdbbb6bb7f0312c.tar.gz
spark-dfd40e9f6f87ff1f205944997cdbbb6bb7f0312c.tar.bz2
spark-dfd40e9f6f87ff1f205944997cdbbb6bb7f0312c.zip
Merge pull request #175 from kayousterhout/no_retry_not_serializable
Don't retry tasks when they fail due to a NotSerializableException As with my previous pull request, this will be unit tested once the Cluster and Local schedulers get merged.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index ee47aaffca..4c5eca8537 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster
+import java.io.NotSerializableException
import java.util.Arrays
import scala.collection.mutable.ArrayBuffer
@@ -484,6 +485,14 @@ private[spark] class ClusterTaskSetManager(
case ef: ExceptionFailure =>
sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+ if (ef.className == classOf[NotSerializableException].getName()) {
+ // If the task result wasn't serializable, there's no point in trying to re-execute it.
+ logError("Task %s:%s had a not serializable result: %s; not retrying".format(
+ taskSet.id, index, ef.description))
+ abort("Task %s:%s had a not serializable result: %s".format(
+ taskSet.id, index, ef.description))
+ return
+ }
val key = ef.description
val now = clock.getTime()
val (printFull, dupCount) = {