aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala58
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala4
4 files changed, 33 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5d812918a1..a083be2448 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -208,19 +208,17 @@ class DAGScheduler(
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
- * The jobId value passed in will be used if the stage doesn't already exist with
- * a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
- jobId: Int): ShuffleMapStage = {
+ firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
- registerShuffleDependencies(shuffleDep, jobId)
+ registerShuffleDependencies(shuffleDep, firstJobId)
// Then register current shuffleDep
- val stage = newOrUsedShuffleStage(shuffleDep, jobId)
+ val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
@@ -230,15 +228,15 @@ class DAGScheduler(
/**
* Helper function to eliminate some code re-use when creating new stages.
*/
- private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
- val parentStages = getParentStages(rdd, jobId)
+ private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
+ val parentStages = getParentStages(rdd, firstJobId)
val id = nextStageId.getAndIncrement()
(parentStages, id)
}
/**
* Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
- * newOrUsedShuffleStage. The stage will be associated with the provided jobId.
+ * newOrUsedShuffleStage. The stage will be associated with the provided firstJobId.
* Production of shuffle map stages should always use newOrUsedShuffleStage, not
* newShuffleMapStage directly.
*/
@@ -246,21 +244,19 @@ class DAGScheduler(
rdd: RDD[_],
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
- jobId: Int,
+ firstJobId: Int,
callSite: CallSite): ShuffleMapStage = {
- val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
+ val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
- jobId, callSite, shuffleDep)
+ firstJobId, callSite, shuffleDep)
stageIdToStage(id) = stage
- updateJobIdStageIdMaps(jobId, stage)
+ updateJobIdStageIdMaps(firstJobId, stage)
stage
}
/**
- * Create a ResultStage -- either directly for use as a result stage, or as part of the
- * (re)-creation of a shuffle map stage in newOrUsedShuffleStage. The stage will be associated
- * with the provided jobId.
+ * Create a ResultStage associated with the provided jobId.
*/
private def newResultStage(
rdd: RDD[_],
@@ -277,16 +273,16 @@ class DAGScheduler(
/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
- * provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
+ * provided firstJobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
- jobId: Int): ShuffleMapStage = {
+ firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.size
- val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
+ val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
@@ -304,10 +300,10 @@ class DAGScheduler(
}
/**
- * Get or create the list of parent stages for a given RDD. The stages will be assigned the
- * provided jobId if they haven't already been created with a lower jobId.
+ * Get or create the list of parent stages for a given RDD. The new Stages will be created with
+ * the provided firstJobId.
*/
- private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
+ private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
@@ -321,7 +317,7 @@ class DAGScheduler(
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
- parents += getShuffleMapStage(shufDep, jobId)
+ parents += getShuffleMapStage(shufDep, firstJobId)
case _ =>
waitingForVisit.push(dep.rdd)
}
@@ -336,11 +332,11 @@ class DAGScheduler(
}
/** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
- private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
+ private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
- val stage = newOrUsedShuffleStage(currentShufDep, jobId)
+ val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}
@@ -390,7 +386,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
- val mapStage = getShuffleMapStage(shufDep, stage.jobId)
+ val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
@@ -577,7 +573,7 @@ class DAGScheduler(
private[scheduler] def doCancelAllJobs() {
// Cancel all running jobs.
- runningStages.map(_.jobId).foreach(handleJobCancellation(_,
+ runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
@@ -603,7 +599,7 @@ class DAGScheduler(
clearCacheLocs()
val failedStagesCopy = failedStages.toArray
failedStages.clear()
- for (stage <- failedStagesCopy.sortBy(_.jobId)) {
+ for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
@@ -623,7 +619,7 @@ class DAGScheduler(
logTrace("failed: " + failedStages)
val waitingStagesCopy = waitingStages.toArray
waitingStages.clear()
- for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
+ for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
submitStage(stage)
}
}
@@ -843,7 +839,7 @@ class DAGScheduler(
}
}
- val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull
+ val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
@@ -909,7 +905,7 @@ class DAGScheduler(
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
- new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
+ new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
@@ -1323,7 +1319,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
- val mapStage = getShuffleMapStage(shufDep, stage.jobId)
+ val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
waitingForVisit.push(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
index c0f3d5a13d..bf81b9aca4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
@@ -28,9 +28,9 @@ private[spark] class ResultStage(
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
- jobId: Int,
+ firstJobId: Int,
callSite: CallSite)
- extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+ extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
// The active job for this result stage. Will be empty if the job has already finished
// (e.g., because the job was cancelled).
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index d022107434..66c75f325f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -30,10 +30,10 @@ private[spark] class ShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
- jobId: Int,
+ firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _])
- extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+ extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
override def toString: String = "ShuffleMapStage " + id
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 5d0ddb8377..c59d6e4f5b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.CallSite
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
- * Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
+ * Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
@@ -51,7 +51,7 @@ private[spark] abstract class Stage(
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
- val jobId: Int,
+ val firstJobId: Int,
val callSite: CallSite)
extends Logging {