aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-05-25 13:57:25 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2016-05-25 13:57:25 -0700
commit698ef762f80cf4c84bc7b7cf083aa97d44b87170 (patch)
tree67e7d221728fec151ffe759ffa34eb64cfe31bc1 /core/src
parentc875d81a3de3f209b9eb03adf96b7c740b2c7b52 (diff)
downloadspark-698ef762f80cf4c84bc7b7cf083aa97d44b87170.tar.gz
spark-698ef762f80cf4c84bc7b7cf083aa97d44b87170.tar.bz2
spark-698ef762f80cf4c84bc7b7cf083aa97d44b87170.zip
[SPARK-14269][SCHEDULER] Eliminate unnecessary submitStage() call.
## What changes were proposed in this pull request? Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status. The case we should try to submit waiting stages is only when their parent stages are successfully completed. This elimination can improve `DAGScheduler` performance. ## How was this patch tested? Added some checks and other existing tests, and our projects. We have a project bottle-necked by `DAGScheduler`, having about 2000 stages. Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows: | | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` | |--------|---------------------:|---------------------------------------:|----------------:| | Before | 760 sec | 710 sec | 667 sec | | After | 440 sec | 14 sec | 10 sec | Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #12060 from ueshin/issues/SPARK-14269.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala7
2 files changed, 17 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5291b66366..766e9797f9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -726,7 +726,6 @@ class DAGScheduler(
reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
- submitWaitingStages()
}
/**
@@ -752,23 +751,21 @@ class DAGScheduler(
submitStage(stage)
}
}
- submitWaitingStages()
}
/**
* Check for waiting stages which are now eligible for resubmission.
- * Ordinarily run on every iteration of the event loop.
+ * Submits stages that depend on the given parent stage. Called when the parent stage completes
+ * successfully.
*/
- private def submitWaitingStages() {
- // TODO: We might want to run this less often, when we are sure that something has become
- // runnable that wasn't before.
- logTrace("Checking for newly runnable parent stages")
+ private def submitWaitingChildStages(parent: Stage) {
+ logTrace(s"Checking if any dependencies of $parent are now runnable")
logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages)
- val waitingStagesCopy = waitingStages.toArray
- waitingStages.clear()
- for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
+ val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
+ waitingStages --= childStages
+ for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
@@ -793,7 +790,6 @@ class DAGScheduler(
}
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
- submitWaitingStages()
}
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
@@ -801,7 +797,6 @@ class DAGScheduler(
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
- submitWaitingStages()
}
private[scheduler] def handleTaskSetFailed(
@@ -809,7 +804,6 @@ class DAGScheduler(
reason: String,
exception: Option[Throwable]): Unit = {
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }
- submitWaitingStages()
}
private[scheduler] def cleanUpAfterSchedulerStop() {
@@ -832,7 +826,6 @@ class DAGScheduler(
private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
- submitWaitingStages()
}
private[scheduler] def handleJobSubmitted(jobId: Int,
@@ -871,8 +864,6 @@ class DAGScheduler(
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
-
- submitWaitingStages()
}
private[scheduler] def handleMapStageSubmitted(jobId: Int,
@@ -916,8 +907,6 @@ class DAGScheduler(
if (finalStage.isAvailable) {
markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
}
-
- submitWaitingStages()
}
/** Submits stage, but first recursively submits any missing parents. */
@@ -1073,6 +1062,8 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
+
+ submitWaitingChildStages(stage)
}
}
@@ -1238,9 +1229,8 @@ class DAGScheduler(
markMapStageJobAsFinished(job, stats)
}
}
+ submitWaitingChildStages(shuffleStage)
}
-
- // Note: newly runnable stages will be submitted below when we submit waiting stages
}
}
@@ -1315,7 +1305,6 @@ class DAGScheduler(
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
- submitWaitingStages()
}
/**
@@ -1357,7 +1346,6 @@ class DAGScheduler(
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
}
- submitWaitingStages()
}
private[scheduler] def handleExecutorAdded(execId: String, host: String) {
@@ -1366,7 +1354,6 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
- submitWaitingStages()
}
private[scheduler] def handleStageCancellation(stageId: Int) {
@@ -1379,7 +1366,6 @@ class DAGScheduler(
case None =>
logInfo("No active jobs to kill for Stage " + stageId)
}
- submitWaitingStages()
}
private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
@@ -1389,7 +1375,6 @@ class DAGScheduler(
failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
}
- submitWaitingStages()
}
/**
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 088a476086..fc75c9e022 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -370,6 +370,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(mapStageB.parents === List(mapStageA))
assert(mapStageC.parents === List(mapStageA, mapStageB))
assert(finalStage.parents === List(mapStageC))
+
+ complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+ complete(taskSets(3), Seq((Success, 42)))
+ assert(results === Map(0 -> 42))
+ assertDataStructuresEmpty()
}
test("zero split job") {