aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala13
1 files changed, 10 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 5a7df6040c..cf4aae03a7 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -17,7 +17,7 @@ import java.nio.ByteBuffer
/**
* Schedules the tasks within a single TaskSet in the ClusterScheduler.
*/
-class TaskSetManager(
+private[spark] class TaskSetManager(
sched: ClusterScheduler,
val taskSet: TaskSet)
extends Logging {
@@ -214,7 +214,8 @@ class TaskSetManager(
}
// Serialize and return the task
val startTime = System.currentTimeMillis
- val serializedTask = ser.serialize(task)
+ val serializedTask = Task.serializeWithDependencies(
+ task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
@@ -243,6 +244,11 @@ class TaskSetManager(
def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
val info = taskInfos(tid)
+ if (info.failed) {
+ // We might get two task-lost messages for the same task in coarse-grained Mesos mode,
+ // or even from Mesos itself when acks get delayed.
+ return
+ }
val index = info.index
info.markSuccessful()
if (!finished(index)) {
@@ -335,13 +341,14 @@ class TaskSetManager(
def error(message: String) {
// Save the error message
- abort("Mesos error: " + message)
+ abort("Error: " + message)
}
def abort(message: String) {
failed = true
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
+ sched.listener.taskSetFailed(taskSet, message)
sched.taskSetFinished(this)
}