aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2017-04-20 09:55:10 -0700
committerYin Huai <yhuai@databricks.com>2017-04-20 09:55:10 -0700
commitb2ebadfd55283348b8a8b37e28075fca0798228a (patch)
treeec79f4bdfa27b9241a0715dcd796677ac66ca73b
parentc5a31d160f47ba51bb9f8a4f3141851034640fc7 (diff)
downloadspark-b2ebadfd55283348b8a8b37e28075fca0798228a.tar.gz
spark-b2ebadfd55283348b8a8b37e28075fca0798228a.tar.bz2
spark-b2ebadfd55283348b8a8b37e28075fca0798228a.zip
[SPARK-20358][CORE] Executors failing stage on interrupted exception thrown by cancelled tasks
## What changes were proposed in this pull request? This was a regression introduced by my earlier PR here: https://github.com/apache/spark/pull/17531 It turns out NonFatal() does not in fact catch InterruptedException. ## How was this patch tested? Extended cancellation unit test coverage. The first test fails before this patch. cc JoshRosen mridulm Author: Eric Liang <ekl@databricks.com> Closes #17659 from ericl/spark-20358.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala26
2 files changed, 19 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 83469c5ff0..18f04391d6 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -432,7 +432,8 @@ private[spark] class Executor(
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason)))
- case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
+ case _: InterruptedException | NonFatal(_) if
+ task != null && task.reasonIfKilled.isDefined =>
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
setTaskFinishedAndClearInterruptStatus()
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 735f4454e2..7e26139a2b 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
- // Launches one task that will run forever. Once the SparkListener detects the task has
+ testCancellingTasks("that raise interrupted exception on cancel") {
+ Thread.sleep(9999999)
+ }
+
+ // SPARK-20217 should not fail stage if task throws non-interrupted exception
+ testCancellingTasks("that raise runtime exception on cancel") {
+ try {
+ Thread.sleep(9999999)
+ } catch {
+ case t: Throwable =>
+ throw new RuntimeException("killed")
+ }
+ }
+
+ // Launches one task that will block forever. Once the SparkListener detects the task has
// started, kill and re-schedule it. The second run of the task will complete immediately.
// If this test times out, then the first version of the task wasn't killed successfully.
- test("Killing tasks") {
+ def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
SparkContextSuite.isTaskStarted = false
@@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
// first attempt will hang
if (!SparkContextSuite.isTaskStarted) {
SparkContextSuite.isTaskStarted = true
- try {
- Thread.sleep(9999999)
- } catch {
- case t: Throwable =>
- // SPARK-20217 should not fail stage if task throws non-interrupted exception
- throw new RuntimeException("killed")
- }
+ blockFn
}
// second attempt succeeds immediately
}