diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2014-01-15 16:03:40 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2014-01-15 16:03:40 -0800 |
commit | a268d634113536f7aca11af23619b9713b5ef5de (patch) | |
tree | e3ca0cf9535caa03a0cb1e949fb5b4528c4897cd /core | |
parent | 5fecd2516dc8de28b76fe6e0fbdca7922cc28d1c (diff) | |
download | spark-a268d634113536f7aca11af23619b9713b5ef5de.tar.gz spark-a268d634113536f7aca11af23619b9713b5ef5de.tar.bz2 spark-a268d634113536f7aca11af23619b9713b5ef5de.zip |
Fail rather than hanging if a task crashes the JVM.
Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED. As a result, the TaskSetManager will retry the task
indefiniteily rather than failing the job after maxFailures. This
commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.
The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/DistributedSuite.scala | 15 |
2 files changed, 16 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fc0ee07089..5ad00a1ed1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -629,7 +629,7 @@ private[spark] class TaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - handleFailedTask(tid, TaskState.KILLED, None) + handleFailedTask(tid, TaskState.FAILED, None) } } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index d9cb7fead5..27c4b01799 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -125,6 +125,21 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(thrown.getMessage.contains("failed 4 times")) } + test("repeatedly failing task that crashes JVM") { + // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather + // than hanging. + sc = new SparkContext(clusterUrl, "test") + failAfter(Span(100000, Millis)) { + val thrown = intercept[SparkException] { + // One of the tasks always fails. + sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) } + } + assert(thrown.getClass === classOf[SparkException]) + System.out.println(thrown.getMessage) + assert(thrown.getMessage.contains("failed 4 times")) + } + } + test("caching") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).cache() |