aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-15 23:47:25 -0800
committerReynold Xin <rxin@apache.org>2014-01-15 23:47:25 -0800
commitc06a307ca22901839df00d25fe623f6faa6af17e (patch)
tree3f0b9db05fc7468a8ecd196b702ec1c86483552f /core
parent84595ea3e25d2f9578b3de34704da14eb02330fa (diff)
parent718a13c179915767107bc20cd27d9480d069231c (diff)
downloadspark-c06a307ca22901839df00d25fe623f6faa6af17e.tar.gz
spark-c06a307ca22901839df00d25fe623f6faa6af17e.tar.bz2
spark-c06a307ca22901839df00d25fe623f6faa6af17e.zip
Merge pull request #445 from kayousterhout/exec_lost
Fail rather than hanging if a task crashes the JVM. Prior to this commit, if a task crashes the JVM, the task (and all other tasks running on that executor) is marked at KILLED rather than FAILED. As a result, the TaskSetManager will retry the task indefinitely rather than failing the job after maxFailures. Eventually, this makes the job hang, because the Standalone Scheduler removes the application after 10 works have failed, and then the app is left in a state where it's disconnected from the master and waiting to reconnect. This commit fixes that problem by marking tasks as FAILED rather than killed when an executor is lost. The downside of this commit is that if task A fails because another task running on the same executor caused the VM to crash, the failure will incorrectly be counted as a failure of task A. This should not be an issue because we typically set maxFailures to 3, and it is unlikely that a task will be co-located with a JVM-crashing task multiple times.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala17
2 files changed, 18 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fc0ee07089..5ad00a1ed1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -629,7 +629,7 @@ private[spark] class TaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- handleFailedTask(tid, TaskState.KILLED, None)
+ handleFailedTask(tid, TaskState.FAILED, None)
}
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index d9cb7fead5..8de7a328d1 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -125,6 +125,23 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(thrown.getMessage.contains("failed 4 times"))
}
+ test("repeatedly failing task that crashes JVM") {
+ // Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
+ // than hanging due to retrying the failed task infinitely many times (eventually the
+ // standalone scheduler will remove the application, causing the job to hang waiting to
+ // reconnect to the master).
+ sc = new SparkContext(clusterUrl, "test")
+ failAfter(Span(100000, Millis)) {
+ val thrown = intercept[SparkException] {
+ // One of the tasks always fails.
+ sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ System.out.println(thrown.getMessage)
+ assert(thrown.getMessage.contains("failed 4 times"))
+ }
+ }
+
test("caching") {
sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).cache()