aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala7
2 files changed, 17 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5291b66366..766e9797f9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -726,7 +726,6 @@ class DAGScheduler(
reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
- submitWaitingStages()
}
/**
@@ -752,23 +751,21 @@ class DAGScheduler(
submitStage(stage)
}
}
- submitWaitingStages()
}
/**
* Check for waiting stages which are now eligible for resubmission.
- * Ordinarily run on every iteration of the event loop.
+ * Submits stages that depend on the given parent stage. Called when the parent stage completes
+ * successfully.
*/
- private def submitWaitingStages() {
- // TODO: We might want to run this less often, when we are sure that something has become
- // runnable that wasn't before.
- logTrace("Checking for newly runnable parent stages")
+ private def submitWaitingChildStages(parent: Stage) {
+ logTrace(s"Checking if any dependencies of $parent are now runnable")
logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages)
- val waitingStagesCopy = waitingStages.toArray
- waitingStages.clear()
- for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
+ val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
+ waitingStages --= childStages
+ for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
@@ -793,7 +790,6 @@ class DAGScheduler(
}
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
- submitWaitingStages()
}
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
@@ -801,7 +797,6 @@ class DAGScheduler(
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
- submitWaitingStages()
}
private[scheduler] def handleTaskSetFailed(
@@ -809,7 +804,6 @@ class DAGScheduler(
reason: String,
exception: Option[Throwable]): Unit = {
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }
- submitWaitingStages()
}
private[scheduler] def cleanUpAfterSchedulerStop() {
@@ -832,7 +826,6 @@ class DAGScheduler(
private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
- submitWaitingStages()
}
private[scheduler] def handleJobSubmitted(jobId: Int,
@@ -871,8 +864,6 @@ class DAGScheduler(
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
-
- submitWaitingStages()
}
private[scheduler] def handleMapStageSubmitted(jobId: Int,
@@ -916,8 +907,6 @@ class DAGScheduler(
if (finalStage.isAvailable) {
markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
}
-
- submitWaitingStages()
}
/** Submits stage, but first recursively submits any missing parents. */
@@ -1073,6 +1062,8 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
+
+ submitWaitingChildStages(stage)
}
}
@@ -1238,9 +1229,8 @@ class DAGScheduler(
markMapStageJobAsFinished(job, stats)
}
}
+ submitWaitingChildStages(shuffleStage)
}
-
- // Note: newly runnable stages will be submitted below when we submit waiting stages
}
}
@@ -1315,7 +1305,6 @@ class DAGScheduler(
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
- submitWaitingStages()
}
/**
@@ -1357,7 +1346,6 @@ class DAGScheduler(
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
}
- submitWaitingStages()
}
private[scheduler] def handleExecutorAdded(execId: String, host: String) {
@@ -1366,7 +1354,6 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
- submitWaitingStages()
}
private[scheduler] def handleStageCancellation(stageId: Int) {
@@ -1379,7 +1366,6 @@ class DAGScheduler(
case None =>
logInfo("No active jobs to kill for Stage " + stageId)
}
- submitWaitingStages()
}
private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
@@ -1389,7 +1375,6 @@ class DAGScheduler(
failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
}
- submitWaitingStages()
}
/**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 088a476086..fc75c9e022 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -370,6 +370,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(mapStageB.parents === List(mapStageA))
assert(mapStageC.parents === List(mapStageA, mapStageB))
assert(finalStage.parents === List(mapStageC))
+
+ complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskSets(3), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty()
}
test("zero split job") {