diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index c3adc28685..8e1d957568 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -352,9 +352,11 @@ private[spark] class TaskSchedulerImpl( } taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => + var executorId: String = null if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => + executorId = execId if (executorIdToTaskCount.contains(execId)) { executorIdToTaskCount(execId) -= 1 } @@ -362,7 +364,17 @@ private[spark] class TaskSchedulerImpl( } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + // In some case, executor has already been removed by driver for heartbeats timeout, + // but at sometime, before executor killed by cluster, the task of running on this + // executor is finished and return task success state to driver. However, this kinds + // of task should be ignored, because the task on this executor is already re-queued + // by driver. For more details, can check in SPARK-14485. + if (executorId != null && !executorIdToTaskCount.contains(executorId)) { + logInfo(s"Ignoring update with state $state for TID $tid because its executor " + + s"has already been removed by driver") + } else { + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) |