aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-01-15 16:03:40 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2014-01-15 16:03:40 -0800
commita268d634113536f7aca11af23619b9713b5ef5de (patch)
treee3ca0cf9535caa03a0cb1e949fb5b4528c4897cd
parent5fecd2516dc8de28b76fe6e0fbdca7922cc28d1c (diff)
downloadspark-a268d634113536f7aca11af23619b9713b5ef5de.tar.gz
spark-a268d634113536f7aca11af23619b9713b5ef5de.tar.bz2
spark-a268d634113536f7aca11af23619b9713b5ef5de.zip
Fail rather than hanging if a task crashes the JVM.
Prior to this commit, if a task crashes the JVM, the task (and all other tasks running on that executor) is marked at KILLED rather than FAILED. As a result, the TaskSetManager will retry the task indefiniteily rather than failing the job after maxFailures. This commit fixes that problem by marking tasks as FAILED rather than killed when an executor is lost. The downside of this commit is that if task A fails because another task running on the same executor caused the VM to crash, the failure will incorrectly be counted as a failure of task A. This should not be an issue because we typically set maxFailures to 3, and it is unlikely that a task will be co-located with a JVM-crashing task multiple times.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala15
2 files changed, 16 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fc0ee07089..5ad00a1ed1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -629,7 +629,7 @@ private[spark] class TaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- handleFailedTask(tid, TaskState.KILLED, None)
+ handleFailedTask(tid, TaskState.FAILED, None)
}
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index d9cb7fead5..27c4b01799 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -125,6 +125,21 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(thrown.getMessage.contains("failed 4 times"))
}
+ test("repeatedly failing task that crashes JVM") {
+ // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
+ // than hanging.
+ sc = new SparkContext(clusterUrl, "test")
+ failAfter(Span(100000, Millis)) {
+ val thrown = intercept[SparkException] {
+ // One of the tasks always fails.
+ sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ System.out.println(thrown.getMessage)
+ assert(thrown.getMessage.contains("failed 4 times"))
+ }
+ }
+
test("caching") {
sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).cache()