aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2016-12-16 12:46:32 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-12-16 12:46:32 -0800
commit295db8259b307cc0e7d9de44f5638c1aa7ef6047 (patch)
treefb1fed80b1b5c0e123d479f63fc25e712f3f34fa /core
parent1169db44bc1d51e68feb6ba2552520b2d660c2c0 (diff)
downloadspark-295db8259b307cc0e7d9de44f5638c1aa7ef6047.tar.gz
spark-295db8259b307cc0e7d9de44f5638c1aa7ef6047.tar.bz2
spark-295db8259b307cc0e7d9de44f5638c1aa7ef6047.zip
[SPARK-17769][CORE][SCHEDULER] Some FetchFailure refactoring
## What changes were proposed in this pull request? Readability rewrites. Changed order of `failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)` and `disallowStageRetryForTest` evaluation. Stage resubmission guard condition changed from `failedStages.isEmpty` to `!failedStages.contains(failedStage)` Log all resubmission of stages ## How was this patch tested? existing tests Author: Mark Hamstra <markhamstra@gmail.com> Closes #15335 from markhamstra/SPARK-17769.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala55
1 files changed, 37 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9378f15b7b..0a1c500d77 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1256,27 +1256,46 @@ class DAGScheduler(
s"longer running")
}
- if (disallowStageRetryForTest) {
- abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
- None)
- } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
- abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
- s"has failed the maximum allowable number of " +
- s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
- s"Most recent failure reason: ${failureMessage}", None)
- } else {
- if (failedStages.isEmpty) {
- // Don't schedule an event to resubmit failed stages if failed isn't empty, because
- // in that case the event will already have been scheduled.
- // TODO: Cancel running tasks in the stage
- logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
- s"$failedStage (${failedStage.name}) due to fetch failure")
- messageScheduler.schedule(new Runnable {
- override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
- }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
+ val shouldAbortStage =
+ failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+ disallowStageRetryForTest
+
+ if (shouldAbortStage) {
+ val abortMessage = if (disallowStageRetryForTest) {
+ "Fetch failure will not retry stage due to testing config"
+ } else {
+ s"""$failedStage (${failedStage.name})
+ |has failed the maximum allowable number of
+ |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+ |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ")
}
+ abortStage(failedStage, abortMessage, None)
+ } else { // update failedStages and make sure a ResubmitFailedStages event is enqueued
+ // TODO: Cancel running tasks in the failed stage -- cf. SPARK-17064
+ val noResubmitEnqueued = !failedStages.contains(failedStage)
failedStages += failedStage
failedStages += mapStage
+ if (noResubmitEnqueued) {
+ // We expect one executor failure to trigger many FetchFailures in rapid succession,
+ // but all of those task failures can typically be handled by a single resubmission of
+ // the failed stage. We avoid flooding the scheduler's event queue with resubmit
+ // messages by checking whether a resubmit is already in the event queue for the
+ // failed stage. If there is already a resubmit enqueued for a different failed
+ // stage, that event would also be sufficient to handle the current failed stage, but
+ // producing a resubmit for each failed stage makes debugging and logging a little
+ // simpler while not producing an overwhelming number of scheduler events.
+ logInfo(
+ s"Resubmitting $mapStage (${mapStage.name}) and " +
+ s"$failedStage (${failedStage.name}) due to fetch failure"
+ )
+ messageScheduler.schedule(
+ new Runnable {
+ override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
+ },
+ DAGScheduler.RESUBMIT_TIMEOUT,
+ TimeUnit.MILLISECONDS
+ )
+ }
}
// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {