diff options
author | Ilya Ganelin <ilya.ganelin@capitalone.com> | 2015-09-02 22:07:50 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-09-02 22:08:24 -0700 |
commit | 4bd85d06e0334c49be18c4612b04d013b37f189c (patch) | |
tree | e5facb9c43c41639937542b531f5774eab77ec7c /core/src | |
parent | 44948a2e9dcad5cd8d1eb749f469e49c5750b5ba (diff) | |
download | spark-4bd85d06e0334c49be18c4612b04d013b37f189c.tar.gz spark-4bd85d06e0334c49be18c4612b04d013b37f189c.tar.bz2 spark-4bd85d06e0334c49be18c4612b04d013b37f189c.zip |
[SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException
The ```Stage``` class now tracks whether there were a sufficient number of consecutive failures of that stage to trigger an abort.
To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than 4 consecutive stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully.
I've added test cases to exercise the most obvious scenarios.
Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Closes #5636 from ilganeli/SPARK-5945.
Diffstat (limited to 'core/src')
3 files changed, 320 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index daf9b0f952..d673cb0946 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1101,7 +1101,6 @@ class DAGScheduler( s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ID ${failedStage.latestInfo.attemptId}) running") } else { - // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. @@ -1117,6 +1116,11 @@ class DAGScheduler( if (disallowStageRetryForTest) { abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None) + } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) { + abortStage(failedStage, s"$failedStage (${failedStage.name}) " + + s"has failed the maximum allowable number of " + + s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " + + s"Most recent failure reason: ${failureMessage}", None) } else if (failedStages.isEmpty) { // Don't schedule an event to resubmit failed stages if failed isn't empty, because // in that case the event will already have been scheduled. @@ -1240,10 +1244,17 @@ class DAGScheduler( if (errorMessage.isEmpty) { logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.latestInfo.completionTime = Some(clock.getTimeMillis()) + + // Clear failure count for this stage, now that it's succeeded. + // We only limit consecutive failures of stage attempts,so that if a stage is + // re-used many times in a long-running job, unrelated failures don't eventually cause the + // stage to be aborted. + stage.clearFailures() } else { stage.latestInfo.stageFailed(errorMessage.get) logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) } + outputCommitCoordinator.stageEnd(stage.id) listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) runningStages -= stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 1cf06856ff..c086535782 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.CallSite * be updated for each attempt. * */ -private[spark] abstract class Stage( +private[scheduler] abstract class Stage( val id: Int, val rdd: RDD[_], val numTasks: Int, @@ -92,6 +92,29 @@ private[spark] abstract class Stage( */ private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId) + /** + * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these + * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure. + * We keep track of each attempt ID that has failed to avoid recording duplicate failures if + * multiple tasks from the same stage attempt fail (SPARK-5945). + */ + private val fetchFailedAttemptIds = new HashSet[Int] + + private[scheduler] def clearFailures() : Unit = { + fetchFailedAttemptIds.clear() + } + + /** + * Check whether we should abort the failedStage due to multiple consecutive fetch failures. + * + * This method updates the running set of failed stage attempts and returns + * true if the number of failures exceeds the allowable number of failures. + */ + private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = { + fetchFailedAttemptIds.add(stageAttemptId) + fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES + } + /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ def makeNewStageAttempt( numPartitionsToCompute: Int, @@ -110,3 +133,8 @@ private[spark] abstract class Stage( case _ => false } } + +private[scheduler] object Stage { + // The number of consecutive failures allowed before a stage is aborted + val MAX_CONSECUTIVE_FETCH_FAILURES = 4 +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2e8688cf41..62957c6697 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -26,11 +26,11 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.CallSite -import org.apache.spark.executor.TaskMetrics class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { @@ -473,6 +473,282 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + + // Helper function to validate state when creating tests for task failures + private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) { + assert(stageAttempt.stageId === stageId) + assert(stageAttempt.stageAttemptId == attempt) + } + + + // Helper functions to extract commonly used code in Fetch Failure test cases + private def setupStageAbortTest(sc: SparkContext) { + sc.listenerBus.addListener(new EndListener()) + ended = false + jobResult = null + } + + // Create a new Listener to confirm that the listenerBus sees the JobEnd message + // when we abort the stage. This message will also be consumed by the EventLoggingListener + // so this will propagate up to the user. + var ended = false + var jobResult : JobResult = null + + class EndListener extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobResult = jobEnd.jobResult + ended = true + } + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * successfully. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param numShufflePartitions - The number of partitions in the next stage + */ + private def completeShuffleMapStageSuccessfully( + stageId: Int, + attemptIdx: Int, + numShufflePartitions: Int): Unit = { + val stageAttempt = taskSets.last + checkStageId(stageId, attemptIdx, stageAttempt) + complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { + case (task, idx) => + (Success, makeMapStatus("host" + ('A' + idx).toChar, numShufflePartitions)) + }.toSeq) + } + + /** + * Common code to get the next stage attempt, confirm it's the one we expect, and complete it + * with all FetchFailure. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + * @param shuffleDep - The shuffle dependency of the stage with a fetch failure + */ + private def completeNextStageWithFetchFailure( + stageId: Int, + attemptIdx: Int, + shuffleDep: ShuffleDependency[_, _, _]): Unit = { + val stageAttempt = taskSets.last + checkStageId(stageId, attemptIdx, stageAttempt) + complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null) + }.toSeq) + } + + /** + * Common code to get the next result stage attempt, confirm it's the one we expect, and + * complete it with a success where we return 42. + * + * @param stageId - The current stageId + * @param attemptIdx - The current attempt count + */ + private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = { + val stageAttempt = taskSets.last + checkStageId(stageId, attemptIdx, stageAttempt) + assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) + complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) + } + + /** + * In this test, we simulate a job where many tasks in the same stage fail. We want to show + * that many fetch failures inside a single stage attempt do not trigger an abort + * on their own, but only when there are enough failing stage attempts. + */ + test("Single stage fetch failure should not abort the stage.") { + setupStageAbortTest(sc) + + val parts = 8 + val shuffleMapRdd = new MyRDD(sc, parts, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, parts, List(shuffleDep)) + submit(reduceRdd, (0 until parts).toArray) + + completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts) + + completeNextStageWithFetchFailure(1, 0, shuffleDep) + + // Resubmit and confirm that now all is well + scheduler.resubmitFailedStages() + + assert(scheduler.runningStages.nonEmpty) + assert(!ended) + + // Complete stage 0 and then stage 1 with a "42" + completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts) + completeNextResultStageWithSuccess(1, 1) + + // Confirm job finished succesfully + sc.listenerBus.waitUntilEmpty(1000) + assert(ended === true) + assert(results === (0 until parts).map { idx => idx -> 42 }.toMap) + assertDataStructuresEmpty() + } + + /** + * In this test we simulate a job failure where the first stage completes successfully and + * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage + * trigger an overall job abort to avoid endless retries. + */ + test("Multiple consecutive stage fetch failures should lead to job being aborted.") { + setupStageAbortTest(sc) + + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + + for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) { + // Complete all the tasks for the current attempt of stage 0 successfully + completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2) + + // Now we should have a new taskSet, for a new attempt of stage 1. + // Fail all these tasks with FetchFailure + completeNextStageWithFetchFailure(1, attempt, shuffleDep) + + // this will trigger a resubmission of stage 0, since we've lost some of its + // map output, for the next iteration through the loop + scheduler.resubmitFailedStages() + + if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) { + assert(scheduler.runningStages.nonEmpty) + assert(!ended) + } else { + // Stage should have been aborted and removed from running stages + assertDataStructuresEmpty() + sc.listenerBus.waitUntilEmpty(1000) + assert(ended) + jobResult match { + case JobFailed(reason) => + assert(reason.getMessage.contains("ResultStage 1 () has failed the maximum")) + case other => fail(s"expected JobFailed, not $other") + } + } + } + } + + /** + * In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each + * shuffle fetch. In total In total, the job has had four failures overall but not four failures + * for a particular stage, and as such should not be aborted. + */ + test("Failures in different stages should not trigger an overall abort") { + setupStageAbortTest(sc) + + val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache() + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache() + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo)) + submit(finalRdd, Array(0)) + + // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations, + // stage 2 fails. + for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) { + // Complete all the tasks for the current attempt of stage 0 successfully + completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2) + + if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) { + // Now we should have a new taskSet, for a new attempt of stage 1. + // Fail all these tasks with FetchFailure + completeNextStageWithFetchFailure(1, attempt, shuffleDepOne) + } else { + completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1) + + // Fail stage 2 + completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2, + shuffleDepTwo) + } + + // this will trigger a resubmission of stage 0, since we've lost some of its + // map output, for the next iteration through the loop + scheduler.resubmitFailedStages() + } + + completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2) + completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1) + + // Succeed stage2 with a "42" + completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2) + + assert(results === Map(0 -> 42)) + assertDataStructuresEmpty() + } + + /** + * In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may + * fail multiple times, succeed, then fail a few more times (because its run again by downstream + * dependencies). The total number of failed attempts for one stage will go over the limit, + * but that doesn't matter, since they have successes in the middle. + */ + test("Non-consecutive stage failures don't trigger abort") { + setupStageAbortTest(sc) + + val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache() + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache() + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo)) + submit(finalRdd, Array(0)) + + // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times. + for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) { + // Make each task in stage 0 success + completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2) + + // Now we should have a new taskSet, for a new attempt of stage 1. + // Fail these tasks with FetchFailure + completeNextStageWithFetchFailure(1, attempt, shuffleDepOne) + + scheduler.resubmitFailedStages() + + // Confirm we have not yet aborted + assert(scheduler.runningStages.nonEmpty) + assert(!ended) + } + + // Rerun stage 0 and 1 to step through the task set + completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2) + completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1) + + // Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages() + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) + + scheduler.resubmitFailedStages() + + // Rerun stage 0 to step through the task set + completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2) + + // Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort + // since we succeeded in between. + completeNextStageWithFetchFailure(1, 4, shuffleDepOne) + + scheduler.resubmitFailedStages() + + // Confirm we have not yet aborted + assert(scheduler.runningStages.nonEmpty) + assert(!ended) + + // Next, succeed all and confirm output + // Rerun stage 0 + 1 + completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2) + completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1) + + // Succeed stage 2 and verify results + completeNextResultStageWithSuccess(2, 1) + + assertDataStructuresEmpty() + sc.listenerBus.waitUntilEmpty(1000) + assert(ended === true) + assert(results === Map(0 -> 42)) + } + test("trivial shuffle with multiple fetch failures") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) @@ -810,7 +1086,7 @@ class DAGSchedulerSuite submit(finalRdd, Array(0)) cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) - // complete stage 2 + // complete stage 0 complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2)))) @@ -818,7 +1094,7 @@ class DAGSchedulerSuite complete(taskSets(1), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) - // pretend stage 0 failed because hostA went down + // pretend stage 2 failed because hostA went down complete(taskSets(2), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null))) // TODO assert this: |