diff options
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 15 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/Stage.scala | 18 |
2 files changed, 14 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 692ed80834..d944f26875 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -187,6 +187,13 @@ class DAGScheduler( /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */ private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) + /** + * Number of consecutive stage attempts allowed before a stage is aborted. + */ + private[scheduler] val maxConsecutiveStageAttempts = + sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", + DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + private val messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") @@ -1282,8 +1289,9 @@ class DAGScheduler( s"longer running") } + failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = - failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) || + failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || disallowStageRetryForTest if (shouldAbortStage) { @@ -1292,7 +1300,7 @@ class DAGScheduler( } else { s"""$failedStage (${failedStage.name}) |has failed the maximum allowable number of - |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. + |times: $maxConsecutiveStageAttempts. |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ") } abortStage(failedStage, abortMessage, None) @@ -1726,4 +1734,7 @@ private[spark] object DAGScheduler { // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in val RESUBMIT_TIMEOUT = 200 + + // Number of consecutive stage attempts allowed before a stage is aborted + val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 32e5df6d75..290fd073ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -87,23 +87,12 @@ private[scheduler] abstract class Stage( * We keep track of each attempt ID that has failed to avoid recording duplicate failures if * multiple tasks from the same stage attempt fail (SPARK-5945). */ - private val fetchFailedAttemptIds = new HashSet[Int] + val fetchFailedAttemptIds = new HashSet[Int] private[scheduler] def clearFailures() : Unit = { fetchFailedAttemptIds.clear() } - /** - * Check whether we should abort the failedStage due to multiple consecutive fetch failures. - * - * This method updates the running set of failed stage attempts and returns - * true if the number of failures exceeds the allowable number of failures. - */ - private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = { - fetchFailedAttemptIds.add(stageAttemptId) - fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES - } - /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ def makeNewStageAttempt( numPartitionsToCompute: Int, @@ -128,8 +117,3 @@ private[scheduler] abstract class Stage( /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ def findMissingPartitions(): Seq[Int] } - -private[scheduler] object Stage { - // The number of consecutive failures allowed before a stage is aborted - val MAX_CONSECUTIVE_FETCH_FAILURES = 4 -} |