diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-15 23:47:25 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-15 23:47:25 -0800 |
commit | c06a307ca22901839df00d25fe623f6faa6af17e (patch) | |
tree | 3f0b9db05fc7468a8ecd196b702ec1c86483552f /core | |
parent | 84595ea3e25d2f9578b3de34704da14eb02330fa (diff) | |
parent | 718a13c179915767107bc20cd27d9480d069231c (diff) | |
download | spark-c06a307ca22901839df00d25fe623f6faa6af17e.tar.gz spark-c06a307ca22901839df00d25fe623f6faa6af17e.tar.bz2 spark-c06a307ca22901839df00d25fe623f6faa6af17e.zip |
Merge pull request #445 from kayousterhout/exec_lost
Fail rather than hanging if a task crashes the JVM.
Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED. As a result, the TaskSetManager will retry the task
indefinitely rather than failing the job after maxFailures. Eventually,
this makes the job hang, because the Standalone Scheduler removes
the application after 10 works have failed, and then the app is left
in a state where it's disconnected from the master and waiting to reconnect.
This commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.
The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/DistributedSuite.scala | 17 |
2 files changed, 18 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fc0ee07089..5ad00a1ed1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -629,7 +629,7 @@ private[spark] class TaskSetManager( } // Also re-enqueue any tasks that were running on the node for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { - handleFailedTask(tid, TaskState.KILLED, None) + handleFailedTask(tid, TaskState.FAILED, None) } } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index d9cb7fead5..8de7a328d1 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -125,6 +125,23 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(thrown.getMessage.contains("failed 4 times")) } + test("repeatedly failing task that crashes JVM") { + // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather + // than hanging due to retrying the failed task infinitely many times (eventually the + // standalone scheduler will remove the application, causing the job to hang waiting to + // reconnect to the master). + sc = new SparkContext(clusterUrl, "test") + failAfter(Span(100000, Millis)) { + val thrown = intercept[SparkException] { + // One of the tasks always fails. + sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) } + } + assert(thrown.getClass === classOf[SparkException]) + System.out.println(thrown.getMessage) + assert(thrown.getMessage.contains("failed 4 times")) + } + } + test("caching") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).cache() |