diff options
3 files changed, 91 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 69101acb3a..0b7d3716c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1181,15 +1181,33 @@ class DAGScheduler( case smt: ShuffleMapTask => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] - shuffleStage.pendingPartitions -= task.partitionId updateAccumulators(event) val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) + if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) { + // This task was for the currently running attempt of the stage. Since the task + // completed successfully from the perspective of the TaskSetManager, mark it as + // no longer pending (the TaskSetManager may consider the task complete even + // when the output needs to be ignored because the task's epoch is too small below. + // In this case, when pending partitions is empty, there will still be missing + // output locations, which will cause the DAGScheduler to resubmit the stage below.) + shuffleStage.pendingPartitions -= task.partitionId + } if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { + // The epoch of the task is acceptable (i.e., the task was launched after the most + // recent failure we're aware of for the executor), so mark the task's output as + // available. shuffleStage.addOutputLoc(smt.partitionId, status) + // Remove the task's partition from pending partitions. This may have already been + // done above, but will not have been done yet in cases where the task attempt was + // from an earlier attempt of the stage (i.e., not the attempt that's currently + // running). This allows the DAGScheduler to mark the stage as complete when one + // copy of each task has finished successfully, even if the currently active stage + // still has tasks running. + shuffleStage.pendingPartitions -= task.partitionId } if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { @@ -1213,7 +1231,7 @@ class DAGScheduler( clearCacheLocs() if (!shuffleStage.isAvailable) { - // Some tasks had failed; let's resubmit this shuffleStage + // Some tasks had failed; let's resubmit this shuffleStage. // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index c6fc038129..32e5df6d75 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -74,7 +74,7 @@ private[scheduler] abstract class Stage( val details: String = callSite.longForm /** - * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized + * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized * here, before any attempts have actually been created, because the DAGScheduler uses this * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts * have been created). diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4e5f267e23..c735220da2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2161,6 +2161,76 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," + + " even with late completions from earlier stage attempts") { + // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + + val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + + submit(rddC, Array(0, 1)) + + // Complete both tasks in rddA. + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2)))) + + // Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA + // and task(stageId=1, stageAttemptId=0, partitionId=1) is still running. + assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0) + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, + "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + result = null)) + + // Both original tasks in rddA should be marked as failed, because they ran on the + // failed hostA, so both should be resubmitted. Complete them on hostB successfully. + scheduler.resubmitFailedStages() + assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 + && taskSets(2).tasks.size === 2) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA + // successfully. The success should be ignored because the task started before the + // executor failed, so the output may have been lost. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2))) + + // Both tasks in rddB should be resubmitted, because none of them has succeeded truely. + // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully. + // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt + // is still running. + assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 + && taskSets(3).tasks.size === 2) + runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + + // There should be no new attempt of stage submitted, + // because task(stageId=1, stageAttempt=1, partitionId=1) is still running in + // the current attempt (and hasn't completed successfully in any earlier attempts). + assert(taskSets.size === 4) + + // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully. + runEvent(makeCompletionEvent( + taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) + + // Now the ResultStage should be submitted, because all of the tasks of rddB have + // completed successfully on alive executors. + assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) + complete(taskSets(4), Seq( + (Success, 1), + (Success, 1))) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. |