aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala18
2 files changed, 14 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 692ed80834..d944f26875 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -187,6 +187,13 @@ class DAGScheduler(
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
+ /**
+ * Number of consecutive stage attempts allowed before a stage is aborted.
+ */
+ private[scheduler] val maxConsecutiveStageAttempts =
+ sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
+ DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
+
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
@@ -1282,8 +1289,9 @@ class DAGScheduler(
s"longer running")
}
+ failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
val shouldAbortStage =
- failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+ failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest
if (shouldAbortStage) {
@@ -1292,7 +1300,7 @@ class DAGScheduler(
} else {
s"""$failedStage (${failedStage.name})
|has failed the maximum allowable number of
- |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |times: $maxConsecutiveStageAttempts.
|Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ")
}
abortStage(failedStage, abortMessage, None)
@@ -1726,4 +1734,7 @@ private[spark] object DAGScheduler {
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
val RESUBMIT_TIMEOUT = 200
+
+ // Number of consecutive stage attempts allowed before a stage is aborted
+ val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 32e5df6d75..290fd073ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -87,23 +87,12 @@ private[scheduler] abstract class Stage(
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
- private val fetchFailedAttemptIds = new HashSet[Int]
+ val fetchFailedAttemptIds = new HashSet[Int]
private[scheduler] def clearFailures() : Unit = {
fetchFailedAttemptIds.clear()
}
- /**
- * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
- *
- * This method updates the running set of failed stage attempts and returns
- * true if the number of failures exceeds the allowable number of failures.
- */
- private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
- fetchFailedAttemptIds.add(stageAttemptId)
- fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
- }
-
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
@@ -128,8 +117,3 @@ private[scheduler] abstract class Stage(
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
def findMissingPartitions(): Seq[Int]
}
-
-private[scheduler] object Stage {
- // The number of consecutive failures allowed before a stage is aborted
- val MAX_CONSECUTIVE_FETCH_FAILURES = 4
-}