aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-08-06 19:29:19 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-08-06 19:29:19 -0700
commit4f5f9b670e1f1783f43feb22490613e72dcff852 (patch)
tree58837079c7ba57da7a48681fa6f0f26bd3bd181f
parent2dd03886173f2f3b5c20fe14e9cdbd33480c1f36 (diff)
downloadspark-4f5f9b670e1f1783f43feb22490613e72dcff852.tar.gz
spark-4f5f9b670e1f1783f43feb22490613e72dcff852.tar.bz2
spark-4f5f9b670e1f1783f43feb22490613e72dcff852.zip
[SPARK-16925] Master should call schedule() after all executor exit events, not only failures
## What changes were proposed in this pull request? This patch fixes a bug in Spark's standalone Master which could cause applications to hang if tasks cause executors to exit with zero exit codes. As an example of the bug, run ``` sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) } ``` on a standalone cluster which has a single Spark application. This will cause all executors to die but those executors won't be replaced unless another Spark application or worker joins or leaves the cluster (or if an executor exits with a non-zero exit code). This behavior is caused by a bug in how the Master handles the `ExecutorStateChanged` event: the current implementation calls `schedule()` only if the executor exited with a non-zero exit code, so a task which causes a JVM to unexpectedly exit "cleanly" will skip the `schedule()` call. This patch addresses this by modifying the `ExecutorStateChanged` to always unconditionally call `schedule()`. This should be safe because it should always be safe to call `schedule()`; adding extra `schedule()` calls can only affect performance and should not introduce correctness bugs. ## How was this patch tested? I added a regression test in `DistributedSuite`. Author: Josh Rosen <joshrosen@databricks.com> Closes #14510 from JoshRosen/SPARK-16925.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala15
2 files changed, 22 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f8aac3008c..fded8475a0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -265,19 +265,16 @@ private[deploy] class Master(
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
- if (!normalExit) {
- if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
- schedule()
- } else {
- val execs = appInfo.executors.values
- if (!execs.exists(_.state == ExecutorState.RUNNING)) {
- logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
- s"${appInfo.retryCount} times; removing it")
- removeApplication(appInfo, ApplicationState.FAILED)
- }
+ if (!normalExit && appInfo.incrementRetryCount() >= ApplicationState.MAX_NUM_RETRY) {
+ val execs = appInfo.executors.values
+ if (!execs.exists(_.state == ExecutorState.RUNNING)) {
+ logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
+ s"${appInfo.retryCount} times; removing it")
+ removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
+ schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 0515e6e3a6..6beae842b0 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -134,6 +134,21 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
}
+ test("repeatedly failing task that crashes JVM with a zero exit code (SPARK-16925)") {
+ // Ensures that if a task which causes the JVM to exit with a zero exit code will cause the
+ // Spark job to eventually fail.
+ sc = new SparkContext(clusterUrl, "test")
+ failAfter(Span(100000, Millis)) {
+ val thrown = intercept[SparkException] {
+ sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) }
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("failed 4 times"))
+ }
+ // Check that the cluster is still usable:
+ sc.parallelize(1 to 10).count()
+ }
+
test("caching") {
sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).cache()