aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-06-24 10:09:04 -0500
committerImran Rashid <irashid@cloudera.com>2016-06-24 10:09:04 -0500
commita4851ed05053a9b7545a258c9159fd529225c455 (patch)
tree74be27f35aebedc1efb57f3af76460dd6d7e2e40 /core/src/main
parentbe88383e15a86d094963de5f7e8792510bc990de (diff)
downloadspark-a4851ed05053a9b7545a258c9159fd529225c455.tar.gz
spark-a4851ed05053a9b7545a258c9159fd529225c455.tar.bz2
spark-a4851ed05053a9b7545a258c9159fd529225c455.zip
[SPARK-15963][CORE] Catch `TaskKilledException` correctly in Executor.TaskRunner
## The problem Before this change, if either of the following cases happened to a task , the task would be marked as `FAILED` instead of `KILLED`: - the task was killed before it was deserialized - `executor.kill()` marked `taskRunner.killed`, but before calling `task.killed()` the worker thread threw the `TaskKilledException` The reason is, in the `catch` block of the current [Executor.TaskRunner](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L362)'s implementation, we are mistakenly catching: ```scala case _: TaskKilledException | _: InterruptedException if task.killed => ... ``` the semantics of which is: - **(**`TaskKilledException` **OR** `InterruptedException`**)** **AND** `task.killed` Then when `TaskKilledException` is thrown but `task.killed` is not marked, we would mark the task as `FAILED` (which should really be `KILLED`). ## What changes were proposed in this pull request? This patch alters the catch condition's semantics from: - **(**`TaskKilledException` **OR** `InterruptedException`**)** **AND** `task.killed` to - `TaskKilledException` **OR** **(**`InterruptedException` **AND** `task.killed`**)** so that we can catch `TaskKilledException` correctly and mark the task as `KILLED` correctly. ## How was this patch tested? Added unit test which failed before the change, ran new test 1000 times manually Author: Liwei Lin <lwlin7@gmail.com> Closes #13685 from lw-lin/fix-task-killed.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala7
1 files changed, 6 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9a017f29f7..fbf2b86db1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -359,11 +359,16 @@ private[spark] class Executor(
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
- case _: TaskKilledException | _: InterruptedException if task.killed =>
+ case _: TaskKilledException =>
logInfo(s"Executor killed $taskName (TID $taskId)")
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
+ case _: InterruptedException if task.killed =>
+ logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")
+ setTaskFinishedAndClearInterruptStatus()
+ execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
+
case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason
setTaskFinishedAndClearInterruptStatus()