diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala')
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 5a7df6040c..cf4aae03a7 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -17,7 +17,7 @@ import java.nio.ByteBuffer /** * Schedules the tasks within a single TaskSet in the ClusterScheduler. */ -class TaskSetManager( +private[spark] class TaskSetManager( sched: ClusterScheduler, val taskSet: TaskSet) extends Logging { @@ -214,7 +214,8 @@ class TaskSetManager( } // Serialize and return the task val startTime = System.currentTimeMillis - val serializedTask = ser.serialize(task) + val serializedTask = Task.serializeWithDependencies( + task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = System.currentTimeMillis - startTime logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) @@ -243,6 +244,11 @@ class TaskSetManager( def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) { val info = taskInfos(tid) + if (info.failed) { + // We might get two task-lost messages for the same task in coarse-grained Mesos mode, + // or even from Mesos itself when acks get delayed. + return + } val index = info.index info.markSuccessful() if (!finished(index)) { @@ -335,13 +341,14 @@ class TaskSetManager( def error(message: String) { // Save the error message - abort("Mesos error: " + message) + abort("Error: " + message) } def abort(message: String) { failed = true causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error + sched.listener.taskSetFailed(taskSet, message) sched.taskSetFinished(this) } |