aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala297
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala65
-rw-r--r--project/MimaExcludes.scala6
6 files changed, 298 insertions, 196 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
index b755d8fb15..50a6937941 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
@@ -27,7 +27,7 @@ import org.apache.spark.util.CallSite
*/
private[spark] class ActiveJob(
val jobId: Int,
- val finalStage: Stage,
+ val finalStage: ResultStage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: CallSite,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b405bd3338..d35b4f9dba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -83,7 +83,7 @@ class DAGScheduler(
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
- private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
+ private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done
@@ -150,7 +150,7 @@ class DAGScheduler(
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
- taskMetrics: TaskMetrics) {
+ taskMetrics: TaskMetrics): Unit = {
eventProcessLoop.post(
CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
}
@@ -173,18 +173,18 @@ class DAGScheduler(
}
// Called by TaskScheduler when an executor fails.
- def executorLost(execId: String) {
+ def executorLost(execId: String): Unit = {
eventProcessLoop.post(ExecutorLost(execId))
}
// Called by TaskScheduler when a host is added
- def executorAdded(execId: String, host: String) {
+ def executorAdded(execId: String, host: String): Unit = {
eventProcessLoop.post(ExecutorAdded(execId, host))
}
// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
- def taskSetFailed(taskSet: TaskSet, reason: String) {
+ def taskSetFailed(taskSet: TaskSet, reason: String): Unit = {
eventProcessLoop.post(TaskSetFailed(taskSet, reason))
}
@@ -210,40 +210,65 @@ class DAGScheduler(
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
- private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
+ private def getShuffleMapStage(
+ shuffleDep: ShuffleDependency[_, _, _],
+ jobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
// Then register current shuffleDep
- val stage =
- newOrUsedStage(
- shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
- shuffleDep.rdd.creationSite)
+ val stage = newOrUsedShuffleStage(shuffleDep, jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
-
+
stage
}
}
/**
- * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
- * of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
- * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
- * directly.
+ * Helper function to eliminate some code re-use when creating new stages.
*/
- private def newStage(
+ private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
+ val parentStages = getParentStages(rdd, jobId)
+ val id = nextStageId.getAndIncrement()
+ (parentStages, id)
+ }
+
+ /**
+ * Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
+ * newOrUsedShuffleStage. The stage will be associated with the provided jobId.
+ * Production of shuffle map stages should always use newOrUsedShuffleStage, not
+ * newShuffleMapStage directly.
+ */
+ private def newShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
- shuffleDep: Option[ShuffleDependency[_, _, _]],
+ shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
- callSite: CallSite)
- : Stage =
- {
- val parentStages = getParentStages(rdd, jobId)
- val id = nextStageId.getAndIncrement()
- val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
+ callSite: CallSite): ShuffleMapStage = {
+ val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
+ val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
+ jobId, callSite, shuffleDep)
+
+ stageIdToStage(id) = stage
+ updateJobIdStageIdMaps(jobId, stage)
+ stage
+ }
+
+ /**
+ * Create a ResultStage -- either directly for use as a result stage, or as part of the
+ * (re)-creation of a shuffle map stage in newOrUsedShuffleStage. The stage will be associated
+ * with the provided jobId.
+ */
+ private def newResultStage(
+ rdd: RDD[_],
+ numTasks: Int,
+ jobId: Int,
+ callSite: CallSite): ResultStage = {
+ val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
+ val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)
+
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
@@ -255,20 +280,17 @@ class DAGScheduler(
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
- private def newOrUsedStage(
- rdd: RDD[_],
- numTasks: Int,
+ private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
- jobId: Int,
- callSite: CallSite)
- : Stage =
- {
- val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
+ jobId: Int): ShuffleMapStage = {
+ val rdd = shuffleDep.rdd
+ val numTasks = rdd.partitions.size
+ val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.size) {
- stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
+ stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
stage.numAvailableOutputs = locs.count(_ != null)
} else {
@@ -306,26 +328,23 @@ class DAGScheduler(
}
}
waitingForVisit.push(rdd)
- while (!waitingForVisit.isEmpty) {
+ while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}
- // Find ancestor missing shuffle dependencies and register into shuffleToMapStage
- private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) = {
+ /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
+ private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
- while (!parentsWithNoMapStage.isEmpty) {
+ while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
- val stage =
- newOrUsedStage(
- currentShufDep.rdd, currentShufDep.rdd.partitions.size, currentShufDep, jobId,
- currentShufDep.rdd.creationSite)
+ val stage = newOrUsedShuffleStage(currentShufDep, jobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}
- // Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet
+ /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
@@ -351,7 +370,7 @@ class DAGScheduler(
}
waitingForVisit.push(rdd)
- while (!waitingForVisit.isEmpty) {
+ while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents
@@ -382,7 +401,7 @@ class DAGScheduler(
}
}
waitingForVisit.push(stage.rdd)
- while (!waitingForVisit.isEmpty) {
+ while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
@@ -392,7 +411,7 @@ class DAGScheduler(
* Registers the given jobId among the jobs that need the given stage and
* all of that stage's ancestors.
*/
- private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) {
+ private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
def updateJobIdStageIdMapsList(stages: List[Stage]) {
if (stages.nonEmpty) {
val s = stages.head
@@ -412,7 +431,7 @@ class DAGScheduler(
*
* @param job The job whose state to cleanup.
*/
- private def cleanupStateForJobAndIndependentStages(job: ActiveJob) {
+ private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = {
val registeredStages = jobIdToStageIds.get(job.jobId)
if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
logError("No stages registered for job " + job.jobId)
@@ -474,8 +493,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
- properties: Properties = null): JobWaiter[U] =
- {
+ properties: Properties = null): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
@@ -504,15 +522,13 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
- properties: Properties = null)
- {
+ properties: Properties = null): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
- case JobSucceeded => {
+ case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
- }
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
@@ -526,9 +542,7 @@ class DAGScheduler(
evaluator: ApproximateEvaluator[U, R],
callSite: CallSite,
timeout: Long,
- properties: Properties = null)
- : PartialResult[R] =
- {
+ properties: Properties = null): PartialResult[R] = {
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
@@ -541,12 +555,12 @@ class DAGScheduler(
/**
* Cancel a job that is running or waiting in the queue.
*/
- def cancelJob(jobId: Int) {
+ def cancelJob(jobId: Int): Unit = {
logInfo("Asked to cancel job " + jobId)
eventProcessLoop.post(JobCancelled(jobId))
}
- def cancelJobGroup(groupId: String) {
+ def cancelJobGroup(groupId: String): Unit = {
logInfo("Asked to cancel job group " + groupId)
eventProcessLoop.post(JobGroupCancelled(groupId))
}
@@ -554,7 +568,7 @@ class DAGScheduler(
/**
* Cancel all jobs that are running or waiting in the queue.
*/
- def cancelAllJobs() {
+ def cancelAllJobs(): Unit = {
eventProcessLoop.post(AllJobsCancelled)
}
@@ -722,13 +736,12 @@ class DAGScheduler(
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
- properties: Properties = null)
- {
- var finalStage: Stage = null
+ properties: Properties = null) {
+ var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
+ finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
@@ -773,7 +786,7 @@ class DAGScheduler(
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
- if (missing == Nil) {
+ if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
@@ -794,13 +807,15 @@ class DAGScheduler(
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()
+
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = {
- if (stage.isShuffleMap) {
- (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
- } else {
- val job = stage.resultOfJob.get
- (0 until job.numPartitions).filter(id => !job.finished(id))
+ stage match {
+ case stage: ShuffleMapStage =>
+ (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
+ case stage: ResultStage =>
+ val job = stage.resultOfJob.get
+ (0 until job.numPartitions).filter(id => !job.finished(id))
}
}
@@ -830,18 +845,21 @@ class DAGScheduler(
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
- val taskBinaryBytes: Array[Byte] =
- if (stage.isShuffleMap) {
- closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
- } else {
- closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
- }
+ val taskBinaryBytes: Array[Byte] = stage match {
+ case stage: ShuffleMapStage =>
+ closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
+ case stage: ResultStage =>
+ closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
+ }
+
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
+
+ // Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
@@ -849,20 +867,22 @@ class DAGScheduler(
return
}
- val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
- partitionsToCompute.map { id =>
- val locs = getPreferredLocs(stage.rdd, id)
- val part = stage.rdd.partitions(id)
- new ShuffleMapTask(stage.id, taskBinary, part, locs)
- }
- } else {
- val job = stage.resultOfJob.get
- partitionsToCompute.map { id =>
- val p: Int = job.partitions(id)
- val part = stage.rdd.partitions(p)
- val locs = getPreferredLocs(stage.rdd, p)
- new ResultTask(stage.id, taskBinary, part, locs, id)
- }
+ val tasks: Seq[Task[_]] = stage match {
+ case stage: ShuffleMapStage =>
+ partitionsToCompute.map { id =>
+ val locs = getPreferredLocs(stage.rdd, id)
+ val part = stage.rdd.partitions(id)
+ new ShuffleMapTask(stage.id, taskBinary, part, locs)
+ }
+
+ case stage: ResultStage =>
+ val job = stage.resultOfJob.get
+ partitionsToCompute.map { id =>
+ val p: Int = job.partitions(id)
+ val part = stage.rdd.partitions(p)
+ val locs = getPreferredLocs(stage.rdd, p)
+ new ResultTask(stage.id, taskBinary, part, locs, id)
+ }
}
if (tasks.size > 0) {
@@ -877,8 +897,17 @@ class DAGScheduler(
// SparkListenerStageCompleted here in case there are no tasks to run.
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
- logDebug("Stage " + stage + " is actually done; %b %d %d".format(
- stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
+
+ val debugString = stage match {
+ case stage: ShuffleMapStage =>
+ s"Stage ${stage} is actually done; " +
+ s"(available: ${stage.isAvailable}," +
+ s"available outputs: ${stage.numAvailableOutputs}," +
+ s"partitions: ${stage.numPartitions})"
+ case stage : ResultStage =>
+ s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
+ }
+ logDebug(debugString)
runningStages -= stage
}
}
@@ -968,7 +997,10 @@ class DAGScheduler(
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
- stage.resultOfJob match {
+ // Cast to ResultStage here because it's part of the ResultTask
+ // TODO Refactor this out to a function that accepts a ResultStage
+ val resultStage = stage.asInstanceOf[ResultStage]
+ resultStage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
@@ -976,7 +1008,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
- markStageAsFinished(stage)
+ markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
@@ -988,7 +1020,7 @@ class DAGScheduler(
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
- // TODO: Perhaps we want to mark the stage as failed?
+ // TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
@@ -997,6 +1029,7 @@ class DAGScheduler(
}
case smt: ShuffleMapTask =>
+ val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
@@ -1004,50 +1037,54 @@ class DAGScheduler(
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
- stage.addOutputLoc(smt.partitionId, status)
+ shuffleStage.addOutputLoc(smt.partitionId, status)
}
- if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
- markStageAsFinished(stage)
+ if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
+ markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
- if (stage.shuffleDep.isDefined) {
- // We supply true to increment the epoch number here in case this is a
- // recomputation of the map outputs. In that case, some nodes may have cached
- // locations with holes (from when we detected the error) and will need the
- // epoch incremented to refetch them.
- // TODO: Only increment the epoch number if this is not the first time
- // we registered these map outputs.
- mapOutputTracker.registerMapOutputs(
- stage.shuffleDep.get.shuffleId,
- stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
- changeEpoch = true)
- }
+
+ // We supply true to increment the epoch number here in case this is a
+ // recomputation of the map outputs. In that case, some nodes may have cached
+ // locations with holes (from when we detected the error) and will need the
+ // epoch incremented to refetch them.
+ // TODO: Only increment the epoch number if this is not the first time
+ // we registered these map outputs.
+ mapOutputTracker.registerMapOutputs(
+ shuffleStage.shuffleDep.shuffleId,
+ shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
+ changeEpoch = true)
+
clearCacheLocs()
- if (stage.outputLocs.exists(_ == Nil)) {
- // Some tasks had failed; let's resubmit this stage
+ if (shuffleStage.outputLocs.contains(Nil)) {
+ // Some tasks had failed; let's resubmit this shuffleStage
// TODO: Lower-level scheduler should also deal with this
- logInfo("Resubmitting " + stage + " (" + stage.name +
+ logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
- stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
- submitStage(stage)
+ shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
+ .map(_._2).mkString(", "))
+ submitStage(shuffleStage)
} else {
val newlyRunnable = new ArrayBuffer[Stage]
- for (stage <- waitingStages) {
- logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
+ for (shuffleStage <- waitingStages) {
+ logInfo("Missing parents for " + shuffleStage + ": " +
+ getMissingParentStages(shuffleStage))
}
- for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
- newlyRunnable += stage
+ for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
+ {
+ newlyRunnable += shuffleStage
}
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
for {
- stage <- newlyRunnable.sortBy(_.id)
- jobId <- activeJobForStage(stage)
+ shuffleStage <- newlyRunnable.sortBy(_.id)
+ jobId <- activeJobForStage(shuffleStage)
} {
- logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
- submitMissingTasks(stage, jobId)
+ logInfo("Submitting " + shuffleStage + " (" +
+ shuffleStage.rdd + "), which is now runnable")
+ submitMissingTasks(shuffleStage, jobId)
}
}
}
@@ -1204,9 +1241,7 @@ class DAGScheduler(
}
}
- /**
- * Fails a job and all stages that are only used by that job, and cleans up relevant state.
- */
+ /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
private def failJobAndIndependentStages(job: ActiveJob, failureReason: String) {
val error = new SparkException(failureReason)
var ableToCancelStages = true
@@ -1254,9 +1289,7 @@ class DAGScheduler(
}
}
- /**
- * Return true if one of stage's ancestors is target.
- */
+ /** Return true if one of stage's ancestors is target. */
private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
if (stage == target) {
return true
@@ -1282,7 +1315,7 @@ class DAGScheduler(
}
}
waitingForVisit.push(stage.rdd)
- while (!waitingForVisit.isEmpty) {
+ while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
visitedRdds.contains(target.rdd)
@@ -1312,9 +1345,7 @@ class DAGScheduler(
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
- visited: HashSet[(RDD[_],Int)])
- : Seq[TaskLocation] =
- {
+ visited: HashSet[(RDD[_],Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd,partition))) {
@@ -1323,12 +1354,12 @@ class DAGScheduler(
}
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
- if (!cached.isEmpty) {
+ if (cached.nonEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
- if (!rddPrefs.isEmpty) {
+ if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
@@ -1412,7 +1443,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
dagScheduler.sc.stop()
}
- override def onStop() {
+ override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
new file mode 100644
index 0000000000..c0f3d5a13d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.CallSite
+
+/**
+ * The ResultStage represents the final stage in a job.
+ */
+private[spark] class ResultStage(
+ id: Int,
+ rdd: RDD[_],
+ numTasks: Int,
+ parents: List[Stage],
+ jobId: Int,
+ callSite: CallSite)
+ extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+
+ // The active job for this result stage. Will be empty if the job has already finished
+ // (e.g., because the job was cancelled).
+ var resultOfJob: Option[ActiveJob] = None
+
+ override def toString: String = "ResultStage " + id
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
new file mode 100644
index 0000000000..d022107434
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.ShuffleDependency
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.CallSite
+
+/**
+ * The ShuffleMapStage represents the intermediate stages in a job.
+ */
+private[spark] class ShuffleMapStage(
+ id: Int,
+ rdd: RDD[_],
+ numTasks: Int,
+ parents: List[Stage],
+ jobId: Int,
+ callSite: CallSite,
+ val shuffleDep: ShuffleDependency[_, _, _])
+ extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+
+ override def toString: String = "ShuffleMapStage " + id
+
+ var numAvailableOutputs: Long = 0
+
+ def isAvailable: Boolean = numAvailableOutputs == numPartitions
+
+ val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
+
+ def addOutputLoc(partition: Int, status: MapStatus): Unit = {
+ val prevList = outputLocs(partition)
+ outputLocs(partition) = status :: prevList
+ if (prevList == Nil) {
+ numAvailableOutputs += 1
+ }
+ }
+
+ def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
+ val prevList = outputLocs(partition)
+ val newList = prevList.filterNot(_.location == bmAddress)
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil) {
+ numAvailableOutputs -= 1
+ }
+ }
+
+ /**
+ * Removes all shuffle outputs associated with this executor. Note that this will also remove
+ * outputs which are served by an external shuffle server (if one exists), as they are still
+ * registered with this execId.
+ */
+ def removeOutputsOnExecutor(execId: String): Unit = {
+ var becameUnavailable = false
+ for (partition <- 0 until numPartitions) {
+ val prevList = outputLocs(partition)
+ val newList = prevList.filterNot(_.location.executorId == execId)
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil) {
+ becameUnavailable = true
+ numAvailableOutputs -= 1
+ }
+ }
+ if (becameUnavailable) {
+ logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
+ this, execId, numAvailableOutputs, numPartitions, isAvailable))
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 4cbc6e84a6..5d0ddb8377 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable.HashSet
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.CallSite
/**
@@ -47,29 +46,23 @@ import org.apache.spark.util.CallSite
* be updated for each attempt.
*
*/
-private[spark] class Stage(
+private[spark] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
- val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val jobId: Int,
val callSite: CallSite)
extends Logging {
- val isShuffleMap = shuffleDep.isDefined
val numPartitions = rdd.partitions.size
- val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
- var numAvailableOutputs = 0
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]
- /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */
- var resultOfJob: Option[ActiveJob] = None
var pendingTasks = new HashSet[Task[_]]
- private var nextAttemptId = 0
+ private var nextAttemptId: Int = 0
val name = callSite.shortForm
val details = callSite.longForm
@@ -77,53 +70,6 @@ private[spark] class Stage(
/** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
var latestInfo: StageInfo = StageInfo.fromStage(this)
- def isAvailable: Boolean = {
- if (!isShuffleMap) {
- true
- } else {
- numAvailableOutputs == numPartitions
- }
- }
-
- def addOutputLoc(partition: Int, status: MapStatus) {
- val prevList = outputLocs(partition)
- outputLocs(partition) = status :: prevList
- if (prevList == Nil) {
- numAvailableOutputs += 1
- }
- }
-
- def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
- val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.location == bmAddress)
- outputLocs(partition) = newList
- if (prevList != Nil && newList == Nil) {
- numAvailableOutputs -= 1
- }
- }
-
- /**
- * Removes all shuffle outputs associated with this executor. Note that this will also remove
- * outputs which are served by an external shuffle server (if one exists), as they are still
- * registered with this execId.
- */
- def removeOutputsOnExecutor(execId: String) {
- var becameUnavailable = false
- for (partition <- 0 until numPartitions) {
- val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.location.executorId == execId)
- outputLocs(partition) = newList
- if (prevList != Nil && newList == Nil) {
- becameUnavailable = true
- numAvailableOutputs -= 1
- }
- }
- if (becameUnavailable) {
- logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
- this, execId, numAvailableOutputs, numPartitions, isAvailable))
- }
- }
-
/** Return a new attempt id, starting with 0. */
def newAttemptId(): Int = {
val id = nextAttemptId
@@ -133,11 +79,8 @@ private[spark] class Stage(
def attemptId: Int = nextAttemptId
- override def toString: String = "Stage " + id
-
- override def hashCode(): Int = id
-
- override def equals(other: Any): Boolean = other match {
+ override final def hashCode(): Int = id
+ override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index efd59a7e54..54500f7c27 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,7 +54,11 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor")
) ++ Seq(
- // SPARK-6510 Add a Graph#minus method acting as Set#difference
+ // SPARK-4655 - Making Stage an Abstract class broke binary compatility even though
+ // the stage class is defined as private[spark]
+ ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.scheduler.Stage")
+ ) ++ Seq(
+ // SPARK-6510 Add a Graph#minus method acting as Set#difference
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
)