aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2017-03-17 09:33:45 -0500
committerImran Rashid <irashid@cloudera.com>2017-03-17 09:33:58 -0500
commit7b5d873aef672aa0aee41e338bab7428101e1ad3 (patch)
treecf93435eaa5644ddf0c431065f340d9e5fab7414 /core/src/main/scala/org/apache
parent13538cf3dd089222c7e12a3cd6e72ac836fa51ac (diff)
downloadspark-7b5d873aef672aa0aee41e338bab7428101e1ad3.tar.gz
spark-7b5d873aef672aa0aee41e338bab7428101e1ad3.tar.bz2
spark-7b5d873aef672aa0aee41e338bab7428101e1ad3.zip
[SPARK-13369] Add config for number of consecutive fetch failures
The previously hardcoded max 4 retries per stage is not suitable for all cluster configurations. Since spark retries a stage at the sign of the first fetch failure, you can easily end up with many stage retries to discover all the failures. In particular, two scenarios this value should change are (1) if there are more than 4 executors per node; in that case, it may take 4 retries to discover the problem with each executor on the node and (2) during cluster maintenance on large clusters, where multiple machines are serviced at once, but you also cannot afford total cluster downtime. By making this value configurable, cluster managers can tune this value to something more appropriate to their cluster configuration. Unit tests Author: Sital Kedia <skedia@fb.com> Closes #17307 from sitalkedia/SPARK-13369.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala18
2 files changed, 14 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 692ed80834..d944f26875 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -187,6 +187,13 @@ class DAGScheduler(
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
+ /**
+ * Number of consecutive stage attempts allowed before a stage is aborted.
+ */
+ private[scheduler] val maxConsecutiveStageAttempts =
+ sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
+ DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
+
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
@@ -1282,8 +1289,9 @@ class DAGScheduler(
s"longer running")
}
+ failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
val shouldAbortStage =
- failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+ failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest
if (shouldAbortStage) {
@@ -1292,7 +1300,7 @@ class DAGScheduler(
} else {
s"""$failedStage (${failedStage.name})
|has failed the maximum allowable number of
- |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |times: $maxConsecutiveStageAttempts.
|Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ")
}
abortStage(failedStage, abortMessage, None)
@@ -1726,4 +1734,7 @@ private[spark] object DAGScheduler {
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
val RESUBMIT_TIMEOUT = 200
+
+ // Number of consecutive stage attempts allowed before a stage is aborted
+ val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 32e5df6d75..290fd073ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -87,23 +87,12 @@ private[scheduler] abstract class Stage(
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
- private val fetchFailedAttemptIds = new HashSet[Int]
+ val fetchFailedAttemptIds = new HashSet[Int]
private[scheduler] def clearFailures() : Unit = {
fetchFailedAttemptIds.clear()
}
- /**
- * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
- *
- * This method updates the running set of failed stage attempts and returns
- * true if the number of failures exceeds the allowable number of failures.
- */
- private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
- fetchFailedAttemptIds.add(stageAttemptId)
- fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
- }
-
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
@@ -128,8 +117,3 @@ private[scheduler] abstract class Stage(
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
def findMissingPartitions(): Seq[Int]
}
-
-private[scheduler] object Stage {
- // The number of consecutive failures allowed before a stage is aborted
- val MAX_CONSECUTIVE_FETCH_FAILURES = 4
-}