aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-11-14 14:56:53 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-11-14 14:56:53 -0800
commit52144caaa70363ffcc63e1f52db32eb1654c1213 (patch)
tree4f92da50a7eb385d5a906f4e2b8adec864747128 /core/src/main/scala/org/apache
parent2b807e4f2f853a9b1e8cba5147d182e7b05022bc (diff)
downloadspark-52144caaa70363ffcc63e1f52db32eb1654c1213.tar.gz
spark-52144caaa70363ffcc63e1f52db32eb1654c1213.tar.bz2
spark-52144caaa70363ffcc63e1f52db32eb1654c1213.zip
Don't retry tasks if result wasn't serializable
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala12
1 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index bc35e53220..e3929e61ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, Sp
Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.{SystemClock, Clock}
+import java.io.NotSerializableException
/**
@@ -488,7 +489,16 @@ private[spark] class TaskSetManager(
return
case ef: ExceptionFailure =>
- sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+ sched.dagScheduler.taskEnded(
+ tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+ if (ef.className == classOf[NotSerializableException].getName()) {
+ // If the task result wasn't rerializable, there's no point in trying to re-execute it.
+ logError("Task %s:%s had a not serializable result: %s; not retrying".format(
+ taskSet.id, index, ef.description))
+ abort("Task %s:%s had a not serializable result: %s".format(
+ taskSet.id, index, ef.description))
+ return
+ }
val key = ef.description
val now = clock.getTime()
val (printFull, dupCount) = {