aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala26
2 files changed, 19 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 83469c5ff0..18f04391d6 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -432,7 +432,8 @@ private[spark] class Executor(
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason)))
- case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
+ case _: InterruptedException | NonFatal(_) if
+ task != null && task.reasonIfKilled.isDefined =>
val killReason = task.reasonIfKilled.getOrElse("unknown reason")
logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason")
setTaskFinishedAndClearInterruptStatus()
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 735f4454e2..7e26139a2b 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -540,10 +540,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
- // Launches one task that will run forever. Once the SparkListener detects the task has
+ testCancellingTasks("that raise interrupted exception on cancel") {
+ Thread.sleep(9999999)
+ }
+
+ // SPARK-20217 should not fail stage if task throws non-interrupted exception
+ testCancellingTasks("that raise runtime exception on cancel") {
+ try {
+ Thread.sleep(9999999)
+ } catch {
+ case t: Throwable =>
+ throw new RuntimeException("killed")
+ }
+ }
+
+ // Launches one task that will block forever. Once the SparkListener detects the task has
// started, kill and re-schedule it. The second run of the task will complete immediately.
// If this test times out, then the first version of the task wasn't killed successfully.
- test("Killing tasks") {
+ def testCancellingTasks(desc: String)(blockFn: => Unit): Unit = test(s"Killing tasks $desc") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
SparkContextSuite.isTaskStarted = false
@@ -572,13 +586,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
// first attempt will hang
if (!SparkContextSuite.isTaskStarted) {
SparkContextSuite.isTaskStarted = true
- try {
- Thread.sleep(9999999)
- } catch {
- case t: Throwable =>
- // SPARK-20217 should not fail stage if task throws non-interrupted exception
- throw new RuntimeException("killed")
- }
+ blockFn
}
// second attempt succeeds immediately
}