diff options
author | Liwei Lin <lwlin7@gmail.com> | 2016-06-24 10:09:04 -0500 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2016-06-24 10:09:04 -0500 |
commit | a4851ed05053a9b7545a258c9159fd529225c455 (patch) | |
tree | 74be27f35aebedc1efb57f3af76460dd6d7e2e40 /core/src/main | |
parent | be88383e15a86d094963de5f7e8792510bc990de (diff) | |
download | spark-a4851ed05053a9b7545a258c9159fd529225c455.tar.gz spark-a4851ed05053a9b7545a258c9159fd529225c455.tar.bz2 spark-a4851ed05053a9b7545a258c9159fd529225c455.zip |
[SPARK-15963][CORE] Catch `TaskKilledException` correctly in Executor.TaskRunner
## The problem
Before this change, if either of the following cases happened to a task , the task would be marked as `FAILED` instead of `KILLED`:
- the task was killed before it was deserialized
- `executor.kill()` marked `taskRunner.killed`, but before calling `task.killed()` the worker thread threw the `TaskKilledException`
The reason is, in the `catch` block of the current [Executor.TaskRunner](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L362)'s implementation, we are mistakenly catching:
```scala
case _: TaskKilledException | _: InterruptedException if task.killed => ...
```
the semantics of which is:
- **(**`TaskKilledException` **OR** `InterruptedException`**)** **AND** `task.killed`
Then when `TaskKilledException` is thrown but `task.killed` is not marked, we would mark the task as `FAILED` (which should really be `KILLED`).
## What changes were proposed in this pull request?
This patch alters the catch condition's semantics from:
- **(**`TaskKilledException` **OR** `InterruptedException`**)** **AND** `task.killed`
to
- `TaskKilledException` **OR** **(**`InterruptedException` **AND** `task.killed`**)**
so that we can catch `TaskKilledException` correctly and mark the task as `KILLED` correctly.
## How was this patch tested?
Added unit test which failed before the change, ran new test 1000 times manually
Author: Liwei Lin <lwlin7@gmail.com>
Closes #13685 from lw-lin/fix-task-killed.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9a017f29f7..fbf2b86db1 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -359,11 +359,16 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - case _: TaskKilledException | _: InterruptedException if task.killed => + case _: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId)") setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) + case _: InterruptedException if task.killed => + logInfo(s"Executor interrupted and killed $taskName (TID $taskId)") + setTaskFinishedAndClearInterruptStatus() + execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) + case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskEndReason setTaskFinishedAndClearInterruptStatus() |