aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorjinxing <jinxing@meituan.com>2017-02-18 10:49:40 -0400
committerKay Ousterhout <kayousterhout@gmail.com>2017-02-18 10:55:18 -0400
commit729ce3703257aa34c00c5c8253e6971faf6a0c8d (patch)
tree4282d3c039e09a5e66e6ca9377dd4989fd79b928 /core/src/main/scala/org
parent21c7d3c31a7078f730a62e53a6252caa7fe7b338 (diff)
downloadspark-729ce3703257aa34c00c5c8253e6971faf6a0c8d.tar.gz
spark-729ce3703257aa34c00c5c8253e6971faf6a0c8d.tar.bz2
spark-729ce3703257aa34c00c5c8253e6971faf6a0c8d.zip
[SPARK-19263] DAGScheduler should avoid sending conflicting task set.
In current `DAGScheduler handleTaskCompletion` code, when event.reason is `Success`, it will first do `stage.pendingPartitions -= task.partitionId`, which maybe a bug when `FetchFailed` happens. **Think about below** 1. Stage 0 runs and generates shuffle output data. 2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA. 3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch; 4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x. 5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task’s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A) 6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage. 7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there’s an existing active task set **In this fix** If a task completion is from a previous stage attempt and the epoch is too low (i.e., it was from a failed executor), don't remove the corresponding partition from pendingPartitions. Author: jinxing <jinxing@meituan.com> Author: jinxing <jinxing6042@126.com> Closes #16620 from jinxing64/SPARK-19263.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
2 files changed, 21 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 69101acb3a..0b7d3716c1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1181,15 +1181,33 @@ class DAGScheduler(
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
- shuffleStage.pendingPartitions -= task.partitionId
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
+ if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
+ // This task was for the currently running attempt of the stage. Since the task
+ // completed successfully from the perspective of the TaskSetManager, mark it as
+ // no longer pending (the TaskSetManager may consider the task complete even
+ // when the output needs to be ignored because the task's epoch is too small below.
+ // In this case, when pending partitions is empty, there will still be missing
+ // output locations, which will cause the DAGScheduler to resubmit the stage below.)
+ shuffleStage.pendingPartitions -= task.partitionId
+ }
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
+ // The epoch of the task is acceptable (i.e., the task was launched after the most
+ // recent failure we're aware of for the executor), so mark the task's output as
+ // available.
shuffleStage.addOutputLoc(smt.partitionId, status)
+ // Remove the task's partition from pending partitions. This may have already been
+ // done above, but will not have been done yet in cases where the task attempt was
+ // from an earlier attempt of the stage (i.e., not the attempt that's currently
+ // running). This allows the DAGScheduler to mark the stage as complete when one
+ // copy of each task has finished successfully, even if the currently active stage
+ // still has tasks running.
+ shuffleStage.pendingPartitions -= task.partitionId
}
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
@@ -1213,7 +1231,7 @@ class DAGScheduler(
clearCacheLocs()
if (!shuffleStage.isAvailable) {
- // Some tasks had failed; let's resubmit this shuffleStage
+ // Some tasks had failed; let's resubmit this shuffleStage.
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c6fc038129..32e5df6d75 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -74,7 +74,7 @@ private[scheduler] abstract class Stage(
val details: String = callSite.longForm
/**
- * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
+ * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).