aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorhushan[胡珊] <hushan@xiaomi.com>2015-09-21 14:26:15 -0500
committerImran Rashid <irashid@cloudera.com>2015-09-21 14:26:15 -0500
commitb78c65b03ae87a3ba348c9d29ff4c296349eb49c (patch)
treea80bfbd17a7b7ba4a05496ac62be051df3aff81c /core
parent331f0b10f78a37d96d3e573d211d74a0935265db (diff)
downloadspark-b78c65b03ae87a3ba348c9d29ff4c296349eb49c.tar.gz
spark-b78c65b03ae87a3ba348c9d29ff4c296349eb49c.tar.bz2
spark-b78c65b03ae87a3ba348c9d29ff4c296349eb49c.zip
[SPARK-5259] [CORE] don't submit stage until its dependencies map outputs are registered
Track pending tasks by partition ID instead of Task objects. Before this change, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs. Author: hushan[胡珊] <hushan@xiaomi.com> Author: Imran Rashid <irashid@cloudera.com> Closes #7699 from squito/SPARK-5259.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala197
4 files changed, 191 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3c9a66e504..394228b272 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -944,7 +944,7 @@ class DAGScheduler(
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
- stage.pendingTasks.clear()
+ stage.pendingPartitions.clear()
// First figure out the indexes of partition ids to compute.
val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
@@ -1060,8 +1060,8 @@ class DAGScheduler(
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
- stage.pendingTasks ++= tasks
- logDebug("New pending tasks: " + stage.pendingTasks)
+ stage.pendingPartitions ++= tasks.map(_.partitionId)
+ logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
@@ -1152,7 +1152,7 @@ class DAGScheduler(
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
- stage.pendingTasks -= task
+ stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
@@ -1198,7 +1198,7 @@ class DAGScheduler(
shuffleStage.addOutputLoc(smt.partitionId, status)
}
- if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
+ if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
@@ -1242,7 +1242,7 @@ class DAGScheduler(
case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
- stage.pendingTasks += task
+ stage.pendingPartitions += task.partitionId
case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index b37eccbd0f..a3829c319c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -66,7 +66,7 @@ private[scheduler] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]
- var pendingTasks = new HashSet[Task[_]]
+ val pendingPartitions = new HashSet[Int]
/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 62af9031b9..c02597c436 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -487,8 +487,8 @@ private[spark] class TaskSetManager(
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
- logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
- taskName, taskId, host, taskLocality, serializedTask.limit))
+ logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
+ s"$taskLocality, ${serializedTask.limit} bytes)")
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 1c55f90ad9..6b5bcf0574 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -479,8 +479,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
- (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
@@ -490,7 +490,7 @@ class DAGSchedulerSuite
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
- complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
// we can see both result blocks now
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
HashSet("hostA", "hostB"))
@@ -782,8 +782,8 @@ class DAGSchedulerSuite
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
- (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
HashSet("hostA", "hostB"))
@@ -1036,6 +1036,173 @@ class DAGSchedulerSuite
}
/**
+ * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
+ * have completions from both the first & second attempt of stage 1. So all the map output is
+ * available before we finish any task set for stage 1. We want to make sure that we don't
+ * submit stage 2 until the map output for stage 1 is registered
+ */
+ test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") {
+ val firstRDD = new MyRDD(sc, 3, Nil)
+ val firstShuffleDep = new ShuffleDependency(firstRDD, null)
+ val firstShuffleId = firstShuffleDep.shuffleId
+ val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+ submit(reduceRdd, Array(0))
+
+ // things start out smoothly, stage 0 completes with no issues
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
+ (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
+ ))
+
+ // then one executor dies, and a task fails in stage 1
+ runEvent(ExecutorLost("exec-hostA"))
+ runEvent(CompletionEvent(
+ taskSets(1).tasks(0),
+ FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
+ null,
+ null,
+ createFakeTaskInfo(),
+ null))
+
+ // so we resubmit stage 0, which completes happily
+ scheduler.resubmitFailedStages()
+ val stage0Resubmit = taskSets(2)
+ assert(stage0Resubmit.stageId == 0)
+ assert(stage0Resubmit.stageAttemptId === 1)
+ val task = stage0Resubmit.tasks(0)
+ assert(task.partitionId === 2)
+ runEvent(CompletionEvent(
+ task,
+ Success,
+ makeMapStatus("hostC", shuffleMapRdd.partitions.length),
+ null,
+ createFakeTaskInfo(),
+ null))
+
+ // now here is where things get tricky : we will now have a task set representing
+ // the second attempt for stage 1, but we *also* have some tasks for the first attempt for
+ // stage 1 still going
+ val stage1Resubmit = taskSets(3)
+ assert(stage1Resubmit.stageId == 1)
+ assert(stage1Resubmit.stageAttemptId === 1)
+ assert(stage1Resubmit.tasks.length === 3)
+
+ // we'll have some tasks finish from the first attempt, and some finish from the second attempt,
+ // so that we actually have all stage outputs, though no attempt has completed all its
+ // tasks
+ runEvent(CompletionEvent(
+ taskSets(3).tasks(0),
+ Success,
+ makeMapStatus("hostC", reduceRdd.partitions.length),
+ null,
+ createFakeTaskInfo(),
+ null))
+ runEvent(CompletionEvent(
+ taskSets(3).tasks(1),
+ Success,
+ makeMapStatus("hostC", reduceRdd.partitions.length),
+ null,
+ createFakeTaskInfo(),
+ null))
+ // late task finish from the first attempt
+ runEvent(CompletionEvent(
+ taskSets(1).tasks(2),
+ Success,
+ makeMapStatus("hostB", reduceRdd.partitions.length),
+ null,
+ createFakeTaskInfo(),
+ null))
+
+ // What should happen now is that we submit stage 2. However, we might not see an error
+ // b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But
+ // we can check some conditions.
+ // Note that the really important thing here is not so much that we submit stage 2 *immediately*
+ // but that we don't end up with some error from these interleaved completions. It would also
+ // be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had
+ // all its tasks complete
+
+ // check that we have all the map output for stage 0 (it should have been there even before
+ // the last round of completions from stage 1, but just to double check it hasn't been messed
+ // up) and also the newly available stage 1
+ val stageToReduceIdxs = Seq(
+ 0 -> (0 until 3),
+ 1 -> (0 until 1)
+ )
+ for {
+ (stage, reduceIdxs) <- stageToReduceIdxs
+ reduceIdx <- reduceIdxs
+ } {
+ // this would throw an exception if the map status hadn't been registered
+ val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx)
+ // really we should have already thrown an exception rather than fail either of these
+ // asserts, but just to be extra defensive let's double check the statuses are OK
+ assert(statuses != null)
+ assert(statuses.nonEmpty)
+ }
+
+ // and check that stage 2 has been submitted
+ assert(taskSets.size == 5)
+ val stage2TaskSet = taskSets(4)
+ assert(stage2TaskSet.stageId == 2)
+ assert(stage2TaskSet.stageAttemptId == 0)
+ }
+
+ /**
+ * We lose an executor after completing some shuffle map tasks on it. Those tasks get
+ * resubmitted, and when they finish the job completes normally
+ */
+ test("register map outputs correctly after ExecutorLost and task Resubmitted") {
+ val firstRDD = new MyRDD(sc, 3, Nil)
+ val firstShuffleDep = new ShuffleDependency(firstRDD, null)
+ val reduceRdd = new MyRDD(sc, 5, List(firstShuffleDep))
+ submit(reduceRdd, Array(0))
+
+ // complete some of the tasks from the first stage, on one host
+ runEvent(CompletionEvent(
+ taskSets(0).tasks(0), Success,
+ makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(
+ taskSets(0).tasks(1), Success,
+ makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
+
+ // now that host goes down
+ runEvent(ExecutorLost("exec-hostA"))
+
+ // so we resubmit those tasks
+ runEvent(CompletionEvent(
+ taskSets(0).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), null))
+ runEvent(CompletionEvent(
+ taskSets(0).tasks(1), Resubmitted, null, null, createFakeTaskInfo(), null))
+
+ // now complete everything on a different host
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length))
+ ))
+
+ // now we should submit stage 1, and the map output from stage 0 should be registered
+
+ // check that we have all the map output for stage 0
+ (0 until reduceRdd.partitions.length).foreach { reduceIdx =>
+ val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
+ // really we should have already thrown an exception rather than fail either of these
+ // asserts, but just to be extra defensive let's double check the statuses are OK
+ assert(statuses != null)
+ assert(statuses.nonEmpty)
+ }
+
+ // and check that stage 1 has been submitted
+ assert(taskSets.size == 2)
+ val stage1TaskSet = taskSets(1)
+ assert(stage1TaskSet.stageId == 1)
+ assert(stage1TaskSet.stageAttemptId == 0)
+ }
+
+ /**
* Makes sure that failures of stage used by multiple jobs are correctly handled.
*
* This test creates the following dependency graph:
@@ -1393,8 +1560,8 @@ class DAGSchedulerSuite
// Submit a map stage by itself
submitMapStage(shuffleDep)
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
- (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
assert(results.size === 1)
results.clear()
assertDataStructuresEmpty()
@@ -1407,7 +1574,7 @@ class DAGSchedulerSuite
// Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch
// from, then TaskSet 3 will run the reduce stage
scheduler.resubmitFailedStages()
- complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
results.clear()
@@ -1452,8 +1619,8 @@ class DAGSchedulerSuite
// Complete the first stage
assert(taskSets(0).stageId === 0)
complete(taskSets(0), Seq(
- (Success, makeMapStatus("hostA", rdd1.partitions.size)),
- (Success, makeMapStatus("hostB", rdd1.partitions.size))))
+ (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+ (Success, makeMapStatus("hostB", rdd1.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
assert(listener1.results.size === 1)
@@ -1461,7 +1628,7 @@ class DAGSchedulerSuite
// When attempting the second stage, show a fetch failure
assert(taskSets(1).stageId === 1)
complete(taskSets(1), Seq(
- (Success, makeMapStatus("hostA", rdd2.partitions.size)),
+ (Success, makeMapStatus("hostA", rdd2.partitions.length)),
(FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
scheduler.resubmitFailedStages()
assert(listener2.results.size === 0) // Second stage listener should not have a result yet
@@ -1469,7 +1636,7 @@ class DAGSchedulerSuite
// Stage 0 should now be running as task set 2; make its task succeed
assert(taskSets(2).stageId === 0)
complete(taskSets(2), Seq(
- (Success, makeMapStatus("hostC", rdd2.partitions.size))))
+ (Success, makeMapStatus("hostC", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
assert(listener2.results.size === 0) // Second stage listener should still not have a result
@@ -1477,8 +1644,8 @@ class DAGSchedulerSuite
// Stage 1 should now be running as task set 3; make its first task succeed
assert(taskSets(3).stageId === 1)
complete(taskSets(3), Seq(
- (Success, makeMapStatus("hostB", rdd2.partitions.size)),
- (Success, makeMapStatus("hostD", rdd2.partitions.size))))
+ (Success, makeMapStatus("hostB", rdd2.partitions.length)),
+ (Success, makeMapStatus("hostD", rdd2.partitions.length))))
assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
assert(listener2.results.size === 1)
@@ -1494,7 +1661,7 @@ class DAGSchedulerSuite
// TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2
assert(taskSets(5).stageId === 1)
complete(taskSets(5), Seq(
- (Success, makeMapStatus("hostE", rdd2.partitions.size))))
+ (Success, makeMapStatus("hostE", rdd2.partitions.length))))
complete(taskSets(6), Seq(
(Success, 53)))
assert(listener3.results === Map(0 -> 52, 1 -> 53))