aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala15
1 files changed, 8 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a177aab5f9..a41b059fa7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -713,13 +713,7 @@ private[spark] class TaskSetManager(
successfulTaskDurations.insert(info.duration)
}
removeRunningTask(tid)
- // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
- // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
- // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
- // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
- // Note: "result.value()" only deserializes the value when it's called at the first time, so
- // here "result.value()" just returns the value and won't block other threads.
- sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
+
// Kill any other attempts for the same task (since those are unnecessary now that one
// attempt completed successfully).
for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
@@ -746,6 +740,13 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
+ // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
+ // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
+ // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
+ // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
+ // Note: "result.value()" only deserializes the value when it's called at the first time, so
+ // here "result.value()" just returns the value and won't block other threads.
+ sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
maybeFinishTaskSet()
}