aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-09-02 22:07:50 -0700
committerAndrew Or <andrew@databricks.com>2015-09-02 22:08:24 -0700
commit4bd85d06e0334c49be18c4612b04d013b37f189c (patch)
treee5facb9c43c41639937542b531f5774eab77ec7c /core/src
parent44948a2e9dcad5cd8d1eb749f469e49c5750b5ba (diff)
downloadspark-4bd85d06e0334c49be18c4612b04d013b37f189c.tar.gz
spark-4bd85d06e0334c49be18c4612b04d013b37f189c.tar.bz2
spark-4bd85d06e0334c49be18c4612b04d013b37f189c.zip
[SPARK-5945] Spark should not retry a stage infinitely on a FetchFailedException
The ```Stage``` class now tracks whether there were a sufficient number of consecutive failures of that stage to trigger an abort. To avoid an infinite loop of stage retries, we abort the job completely after 4 consecutive stage failures for one stage. We still allow more than 4 consecutive stage failures if there is an intervening successful attempt for the stage, so that in very long-lived applications, where a stage may get reused many times, we don't abort the job after failures that have been recovered from successfully. I've added test cases to exercise the most obvious scenarios. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Closes #5636 from ilganeli/SPARK-5945.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala282
3 files changed, 320 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index daf9b0f952..d673cb0946 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1101,7 +1101,6 @@ class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
} else {
-
// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is
// possible the fetch failure has already been handled by the scheduler.
@@ -1117,6 +1116,11 @@ class DAGScheduler(
if (disallowStageRetryForTest) {
abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
None)
+ } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
+ abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
+ s"has failed the maximum allowable number of " +
+ s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
+ s"Most recent failure reason: ${failureMessage}", None)
} else if (failedStages.isEmpty) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled.
@@ -1240,10 +1244,17 @@ class DAGScheduler(
if (errorMessage.isEmpty) {
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.latestInfo.completionTime = Some(clock.getTimeMillis())
+
+ // Clear failure count for this stage, now that it's succeeded.
+ // We only limit consecutive failures of stage attempts,so that if a stage is
+ // re-used many times in a long-running job, unrelated failures don't eventually cause the
+ // stage to be aborted.
+ stage.clearFailures()
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
}
+
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 1cf06856ff..c086535782 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.CallSite
* be updated for each attempt.
*
*/
-private[spark] abstract class Stage(
+private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
@@ -92,6 +92,29 @@ private[spark] abstract class Stage(
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
+ /**
+ * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
+ * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
+ * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
+ * multiple tasks from the same stage attempt fail (SPARK-5945).
+ */
+ private val fetchFailedAttemptIds = new HashSet[Int]
+
+ private[scheduler] def clearFailures() : Unit = {
+ fetchFailedAttemptIds.clear()
+ }
+
+ /**
+ * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
+ *
+ * This method updates the running set of failed stage attempts and returns
+ * true if the number of failures exceeds the allowable number of failures.
+ */
+ private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
+ fetchFailedAttemptIds.add(stageAttemptId)
+ fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
+ }
+
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
@@ -110,3 +133,8 @@ private[spark] abstract class Stage(
case _ => false
}
}
+
+private[scheduler] object Stage {
+ // The number of consecutive failures allowed before a stage is aborted
+ val MAX_CONSECUTIVE_FETCH_FAILURES = 4
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2e8688cf41..62957c6697 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -26,11 +26,11 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.CallSite
-import org.apache.spark.executor.TaskMetrics
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -473,6 +473,282 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
+
+ // Helper function to validate state when creating tests for task failures
+ private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
+ assert(stageAttempt.stageId === stageId)
+ assert(stageAttempt.stageAttemptId == attempt)
+ }
+
+
+ // Helper functions to extract commonly used code in Fetch Failure test cases
+ private def setupStageAbortTest(sc: SparkContext) {
+ sc.listenerBus.addListener(new EndListener())
+ ended = false
+ jobResult = null
+ }
+
+ // Create a new Listener to confirm that the listenerBus sees the JobEnd message
+ // when we abort the stage. This message will also be consumed by the EventLoggingListener
+ // so this will propagate up to the user.
+ var ended = false
+ var jobResult : JobResult = null
+
+ class EndListener extends SparkListener {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ jobResult = jobEnd.jobResult
+ ended = true
+ }
+ }
+
+ /**
+ * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
+ * successfully.
+ *
+ * @param stageId - The current stageId
+ * @param attemptIdx - The current attempt count
+ * @param numShufflePartitions - The number of partitions in the next stage
+ */
+ private def completeShuffleMapStageSuccessfully(
+ stageId: Int,
+ attemptIdx: Int,
+ numShufflePartitions: Int): Unit = {
+ val stageAttempt = taskSets.last
+ checkStageId(stageId, attemptIdx, stageAttempt)
+ complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
+ case (task, idx) =>
+ (Success, makeMapStatus("host" + ('A' + idx).toChar, numShufflePartitions))
+ }.toSeq)
+ }
+
+ /**
+ * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
+ * with all FetchFailure.
+ *
+ * @param stageId - The current stageId
+ * @param attemptIdx - The current attempt count
+ * @param shuffleDep - The shuffle dependency of the stage with a fetch failure
+ */
+ private def completeNextStageWithFetchFailure(
+ stageId: Int,
+ attemptIdx: Int,
+ shuffleDep: ShuffleDependency[_, _, _]): Unit = {
+ val stageAttempt = taskSets.last
+ checkStageId(stageId, attemptIdx, stageAttempt)
+ complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null)
+ }.toSeq)
+ }
+
+ /**
+ * Common code to get the next result stage attempt, confirm it's the one we expect, and
+ * complete it with a success where we return 42.
+ *
+ * @param stageId - The current stageId
+ * @param attemptIdx - The current attempt count
+ */
+ private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
+ val stageAttempt = taskSets.last
+ checkStageId(stageId, attemptIdx, stageAttempt)
+ assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
+ complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
+ }
+
+ /**
+ * In this test, we simulate a job where many tasks in the same stage fail. We want to show
+ * that many fetch failures inside a single stage attempt do not trigger an abort
+ * on their own, but only when there are enough failing stage attempts.
+ */
+ test("Single stage fetch failure should not abort the stage.") {
+ setupStageAbortTest(sc)
+
+ val parts = 8
+ val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, parts, List(shuffleDep))
+ submit(reduceRdd, (0 until parts).toArray)
+
+ completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)
+
+ completeNextStageWithFetchFailure(1, 0, shuffleDep)
+
+ // Resubmit and confirm that now all is well
+ scheduler.resubmitFailedStages()
+
+ assert(scheduler.runningStages.nonEmpty)
+ assert(!ended)
+
+ // Complete stage 0 and then stage 1 with a "42"
+ completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts)
+ completeNextResultStageWithSuccess(1, 1)
+
+ // Confirm job finished succesfully
+ sc.listenerBus.waitUntilEmpty(1000)
+ assert(ended === true)
+ assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * In this test we simulate a job failure where the first stage completes successfully and
+ * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage
+ * trigger an overall job abort to avoid endless retries.
+ */
+ test("Multiple consecutive stage fetch failures should lead to job being aborted.") {
+ setupStageAbortTest(sc)
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduceRdd, Array(0, 1))
+
+ for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+ // Complete all the tasks for the current attempt of stage 0 successfully
+ completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+ // Now we should have a new taskSet, for a new attempt of stage 1.
+ // Fail all these tasks with FetchFailure
+ completeNextStageWithFetchFailure(1, attempt, shuffleDep)
+
+ // this will trigger a resubmission of stage 0, since we've lost some of its
+ // map output, for the next iteration through the loop
+ scheduler.resubmitFailedStages()
+
+ if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+ assert(scheduler.runningStages.nonEmpty)
+ assert(!ended)
+ } else {
+ // Stage should have been aborted and removed from running stages
+ assertDataStructuresEmpty()
+ sc.listenerBus.waitUntilEmpty(1000)
+ assert(ended)
+ jobResult match {
+ case JobFailed(reason) =>
+ assert(reason.getMessage.contains("ResultStage 1 () has failed the maximum"))
+ case other => fail(s"expected JobFailed, not $other")
+ }
+ }
+ }
+ }
+
+ /**
+ * In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each
+ * shuffle fetch. In total In total, the job has had four failures overall but not four failures
+ * for a particular stage, and as such should not be aborted.
+ */
+ test("Failures in different stages should not trigger an overall abort") {
+ setupStageAbortTest(sc)
+
+ val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
+ val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
+ val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+ submit(finalRdd, Array(0))
+
+ // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
+ // stage 2 fails.
+ for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+ // Complete all the tasks for the current attempt of stage 0 successfully
+ completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+ if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) {
+ // Now we should have a new taskSet, for a new attempt of stage 1.
+ // Fail all these tasks with FetchFailure
+ completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
+ } else {
+ completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1)
+
+ // Fail stage 2
+ completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2,
+ shuffleDepTwo)
+ }
+
+ // this will trigger a resubmission of stage 0, since we've lost some of its
+ // map output, for the next iteration through the loop
+ scheduler.resubmitFailedStages()
+ }
+
+ completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
+ completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1)
+
+ // Succeed stage2 with a "42"
+ completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2)
+
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may
+ * fail multiple times, succeed, then fail a few more times (because its run again by downstream
+ * dependencies). The total number of failed attempts for one stage will go over the limit,
+ * but that doesn't matter, since they have successes in the middle.
+ */
+ test("Non-consecutive stage failures don't trigger abort") {
+ setupStageAbortTest(sc)
+
+ val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
+ val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
+ val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
+ val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
+ val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+ submit(finalRdd, Array(0))
+
+ // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
+ for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+ // Make each task in stage 0 success
+ completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+ // Now we should have a new taskSet, for a new attempt of stage 1.
+ // Fail these tasks with FetchFailure
+ completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
+
+ scheduler.resubmitFailedStages()
+
+ // Confirm we have not yet aborted
+ assert(scheduler.runningStages.nonEmpty)
+ assert(!ended)
+ }
+
+ // Rerun stage 0 and 1 to step through the task set
+ completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2)
+ completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1)
+
+ // Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages()
+ completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
+
+ scheduler.resubmitFailedStages()
+
+ // Rerun stage 0 to step through the task set
+ completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
+
+ // Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort
+ // since we succeeded in between.
+ completeNextStageWithFetchFailure(1, 4, shuffleDepOne)
+
+ scheduler.resubmitFailedStages()
+
+ // Confirm we have not yet aborted
+ assert(scheduler.runningStages.nonEmpty)
+ assert(!ended)
+
+ // Next, succeed all and confirm output
+ // Rerun stage 0 + 1
+ completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2)
+ completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1)
+
+ // Succeed stage 2 and verify results
+ completeNextResultStageWithSuccess(2, 1)
+
+ assertDataStructuresEmpty()
+ sc.listenerBus.waitUntilEmpty(1000)
+ assert(ended === true)
+ assert(results === Map(0 -> 42))
+ }
+
test("trivial shuffle with multiple fetch failures") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -810,7 +1086,7 @@ class DAGSchedulerSuite
submit(finalRdd, Array(0))
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
- // complete stage 2
+ // complete stage 0
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
@@ -818,7 +1094,7 @@ class DAGSchedulerSuite
complete(taskSets(1), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
- // pretend stage 0 failed because hostA went down
+ // pretend stage 2 failed because hostA went down
complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
// TODO assert this: