aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala70
3 files changed, 91 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 69101acb3a..0b7d3716c1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1181,15 +1181,33 @@ class DAGScheduler(
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
- shuffleStage.pendingPartitions -= task.partitionId
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
+ if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
+ // This task was for the currently running attempt of the stage. Since the task
+ // completed successfully from the perspective of the TaskSetManager, mark it as
+ // no longer pending (the TaskSetManager may consider the task complete even
+ // when the output needs to be ignored because the task's epoch is too small below.
+ // In this case, when pending partitions is empty, there will still be missing
+ // output locations, which will cause the DAGScheduler to resubmit the stage below.)
+ shuffleStage.pendingPartitions -= task.partitionId
+ }
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
+ // The epoch of the task is acceptable (i.e., the task was launched after the most
+ // recent failure we're aware of for the executor), so mark the task's output as
+ // available.
shuffleStage.addOutputLoc(smt.partitionId, status)
+ // Remove the task's partition from pending partitions. This may have already been
+ // done above, but will not have been done yet in cases where the task attempt was
+ // from an earlier attempt of the stage (i.e., not the attempt that's currently
+ // running). This allows the DAGScheduler to mark the stage as complete when one
+ // copy of each task has finished successfully, even if the currently active stage
+ // still has tasks running.
+ shuffleStage.pendingPartitions -= task.partitionId
}
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
@@ -1213,7 +1231,7 @@ class DAGScheduler(
clearCacheLocs()
if (!shuffleStage.isAvailable) {
- // Some tasks had failed; let's resubmit this shuffleStage
+ // Some tasks had failed; let's resubmit this shuffleStage.
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c6fc038129..32e5df6d75 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -74,7 +74,7 @@ private[scheduler] abstract class Stage(
val details: String = callSite.longForm
/**
- * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
+ * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4e5f267e23..c735220da2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2161,6 +2161,76 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
}
}
+ test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
+ " even with late completions from earlier stage attempts") {
+ // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
+ val rddA = new MyRDD(sc, 2, Nil)
+ val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+ val shuffleIdA = shuffleDepA.shuffleId
+
+ val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+ val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+ val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+ submit(rddC, Array(0, 1))
+
+ // Complete both tasks in rddA.
+ assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 2)),
+ (Success, makeMapStatus("hostA", 2))))
+
+ // Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA
+ // and task(stageId=1, stageAttemptId=0, partitionId=1) is still running.
+ assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
+ runEvent(makeCompletionEvent(
+ taskSets(1).tasks(0),
+ FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+ "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+ result = null))
+
+ // Both original tasks in rddA should be marked as failed, because they ran on the
+ // failed hostA, so both should be resubmitted. Complete them on hostB successfully.
+ scheduler.resubmitFailedStages()
+ assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
+ && taskSets(2).tasks.size === 2)
+ complete(taskSets(2), Seq(
+ (Success, makeMapStatus("hostB", 2)),
+ (Success, makeMapStatus("hostB", 2))))
+
+ // Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA
+ // successfully. The success should be ignored because the task started before the
+ // executor failed, so the output may have been lost.
+ runEvent(makeCompletionEvent(
+ taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
+
+ // Both tasks in rddB should be resubmitted, because none of them has succeeded truely.
+ // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully.
+ // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt
+ // is still running.
+ assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1
+ && taskSets(3).tasks.size === 2)
+ runEvent(makeCompletionEvent(
+ taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+ // There should be no new attempt of stage submitted,
+ // because task(stageId=1, stageAttempt=1, partitionId=1) is still running in
+ // the current attempt (and hasn't completed successfully in any earlier attempts).
+ assert(taskSets.size === 4)
+
+ // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
+ runEvent(makeCompletionEvent(
+ taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
+
+ // Now the ResultStage should be submitted, because all of the tasks of rddB have
+ // completed successfully on alive executors.
+ assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
+ complete(taskSets(4), Seq(
+ (Success, 1),
+ (Success, 1)))
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.