diff options
author | Eric Liang <ekl@databricks.com> | 2017-04-05 19:37:21 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2017-04-05 19:37:21 -0700 |
commit | 5142e5d4e09c7cb36cf1d792934a21c5305c6d42 (patch) | |
tree | 65415955902cff64b0ddbd9feba75e146927a8bd /core/src | |
parent | 4000f128b7101484ba618115504ca916c22fa84a (diff) | |
download | spark-5142e5d4e09c7cb36cf1d792934a21c5305c6d42.tar.gz spark-5142e5d4e09c7cb36cf1d792934a21c5305c6d42.tar.bz2 spark-5142e5d4e09c7cb36cf1d792934a21c5305c6d42.zip |
[SPARK-20217][CORE] Executor should not fail stage if killed task throws non-interrupted exception
## What changes were proposed in this pull request?
If tasks throw non-interrupted exceptions on kill (e.g. java.nio.channels.ClosedByInterruptException), their death is reported back as TaskFailed instead of TaskKilled. This causes stage failure in some cases.
This is reproducible as follows. Run the following, and then use SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will fail since we threw a RuntimeException instead of InterruptedException.
```
spark.range(100).repartition(100).foreach { i =>
try {
Thread.sleep(10000000)
} catch {
case t: InterruptedException =>
throw new RuntimeException(t)
}
}
```
Based on the code in TaskSetManager, I think this also affects kills of speculative tasks. However, since the number of speculated tasks is few, and usually you need to fail a task a few times before the stage is cancelled, it unlikely this would be noticed in production unless both speculation was enabled and the num allowed task failures was = 1.
We should probably unconditionally return TaskKilled instead of TaskFailed if the task was killed by the driver, regardless of the actual exception thrown.
## How was this patch tested?
Unit test. The test fails before the change in Executor.scala
cc JoshRosen
Author: Eric Liang <ekl@databricks.com>
Closes #17531 from ericl/fix-task-interrupt.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 8 |
2 files changed, 8 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 99b1608010..83469c5ff0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -432,7 +432,7 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - case _: InterruptedException if task.reasonIfKilled.isDefined => + case NonFatal(_) if task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 2c947556df..735f4454e2 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -572,7 +572,13 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // first attempt will hang if (!SparkContextSuite.isTaskStarted) { SparkContextSuite.isTaskStarted = true - Thread.sleep(9999999) + try { + Thread.sleep(9999999) + } catch { + case t: Throwable => + // SPARK-20217 should not fail stage if task throws non-interrupted exception + throw new RuntimeException("killed") + } } // second attempt succeeds immediately } |