diff options
author | Eric Liang <ekl@databricks.com> | 2017-04-20 09:55:10 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2017-04-20 09:55:10 -0700 |
commit | b2ebadfd55283348b8a8b37e28075fca0798228a (patch) | |
tree | ec79f4bdfa27b9241a0715dcd796677ac66ca73b | |
parent | c5a31d160f47ba51bb9f8a4f3141851034640fc7 (diff) | |
download | spark-b2ebadfd55283348b8a8b37e28075fca0798228a.tar.gz spark-b2ebadfd55283348b8a8b37e28075fca0798228a.tar.bz2 spark-b2ebadfd55283348b8a8b37e28075fca0798228a.zip |
[SPARK-20358][CORE] Executors failing stage on interrupted exception thrown by cancelled tasks
## What changes were proposed in this pull request?
This was a regression introduced by my earlier PR here: https://github.com/apache/spark/pull/17531
It turns out NonFatal() does not in fact catch InterruptedException.
## How was this patch tested?
Extended cancellation unit test coverage. The first test fails before this patch.
cc JoshRosen mridulm
Author: Eric Liang <ekl@databricks.com>
Closes #17659 from ericl/spark-20358.
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 26 |
2 files changed, 19 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 83469c5ff0..18f04391d6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -432,7 +432,8 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - case NonFatal(_) if task != null && task.reasonIfKilled.isDefined => + case _: InterruptedException | NonFatal(_) if + task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 735f4454e2..7e26139a2b 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - // Launches one task that will run forever. Once the SparkListener detects the task has + testCancellingTasks("that raise interrupted exception on cancel") { + Thread.sleep(9999999) + } + + // SPARK-20217 should not fail stage if task throws non-interrupted exception + testCancellingTasks("that raise runtime exception on cancel") { + try { + Thread.sleep(9999999) + } catch { + case t: Throwable => + throw new RuntimeException("killed") + } + } + + // Launches one task that will block forever. Once the SparkListener detects the task has // started, kill and re-schedule it. The second run of the task will complete immediately. // If this test times out, then the first version of the task wasn't killed successfully. - test("Killing tasks") { + def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) SparkContextSuite.isTaskStarted = false @@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // first attempt will hang if (!SparkContextSuite.isTaskStarted) { SparkContextSuite.isTaskStarted = true - try { - Thread.sleep(9999999) - } catch { - case t: Throwable => - // SPARK-20217 should not fail stage if task throws non-interrupted exception - throw new RuntimeException("killed") - } + blockFn } // second attempt succeeds immediately } |