aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzhonghaihua <793507405@qq.com>2016-06-07 16:30:58 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-06-07 16:32:27 -0700
commit695dbc816a6d70289abeb145cb62ff4e62b3f49b (patch)
tree1fffdbbd8e45abcc918688b580da4a00c62dea2b /core
parent6ecedf39b44c9acd58cdddf1a31cf11e8e24428c (diff)
downloadspark-695dbc816a6d70289abeb145cb62ff4e62b3f49b.tar.gz
spark-695dbc816a6d70289abeb145cb62ff4e62b3f49b.tar.bz2
spark-695dbc816a6d70289abeb145cb62ff4e62b3f49b.zip
[SPARK-14485][CORE] ignore task finished for executor lost and removed by driver
Now, when executor is removed by driver with heartbeats timeout, driver will re-queue the task on this executor and send a kill command to cluster to kill this executor. But, in a situation, the running task of this executor is finished and return result to driver before this executor killed by kill command sent by driver. At this situation, driver will accept the task finished event and ignore speculative task and re-queued task. But, as we know, this executor has removed by driver, the result of this finished task can not save in driver because the BlockManagerId has also removed from BlockManagerMaster by driver. So, the result data of this stage is not complete, and then, it will cause fetch failure. For more details, [link to jira issues SPARK-14485](https://issues.apache.org/jira/browse/SPARK-14485) This PR introduce a mechanism to ignore this kind of task finished. N/A Author: zhonghaihua <793507405@qq.com> Closes #12258 from zhonghaihua/ignoreTaskFinishForExecutorLostAndRemovedByDriver.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala14
1 files changed, 13 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index c3adc28685..8e1d957568 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -352,9 +352,11 @@ private[spark] class TaskSchedulerImpl(
}
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
+ var executorId: String = null
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
+ executorId = execId
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
@@ -362,7 +364,17 @@ private[spark] class TaskSchedulerImpl(
}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
- taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
+ // In some case, executor has already been removed by driver for heartbeats timeout,
+ // but at sometime, before executor killed by cluster, the task of running on this
+ // executor is finished and return task success state to driver. However, this kinds
+ // of task should be ignored, because the task on this executor is already re-queued
+ // by driver. For more details, can check in SPARK-14485.
+ if (executorId != null && !executorIdToTaskCount.contains(executorId)) {
+ logInfo(s"Ignoring update with state $state for TID $tid because its executor " +
+ s"has already been removed by driver")
+ } else {
+ taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
+ }
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)