diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/DAGScheduler.scala')
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 209 |
1 files changed, 115 insertions, 94 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9b45fc2938..9402f18a0f 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -32,10 +32,22 @@ import spark.storage.{BlockManager, BlockManagerMaster} import spark.util.{MetadataCleaner, TimeStampedHashMap} /** - * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for - * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal - * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster - * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). + * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of + * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a + * minimal schedule to run the job. It then submits stages as TaskSets to an underlying + * TaskScheduler implementation that runs them on the cluster. + * + * In addition to coming up with a DAG of stages, this class also determines the preferred + * locations to run each task on, based on the current cache status, and passes these to the + * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being + * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are + * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task + * a small number of times before cancelling the whole stage. + * + * THREADING: This class runs all its logic in a single thread executing the run() method, to which + * events are submitted using a synchonized queue (eventQueue). The public API methods, such as + * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods + * should be private. */ private[spark] class DAGScheduler( @@ -72,8 +84,8 @@ class DAGScheduler( } // Called by TaskScheduler when a host is added - override def executorGained(execId: String, hostPort: String) { - eventQueue.put(ExecutorGained(execId, hostPort)) + override def executorGained(execId: String, host: String) { + eventQueue.put(ExecutorGained(execId, host)) } // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures. @@ -92,27 +104,28 @@ class DAGScheduler( private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent] - val nextRunId = new AtomicInteger(0) + val nextJobId = new AtomicInteger(0) val nextStageId = new AtomicInteger(0) - val idToStage = new TimeStampedHashMap[Int, Stage] + val stageIdToStage = new TimeStampedHashMap[Int, Stage] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage] private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private[spark] val sparkListeners = ArrayBuffer[SparkListener]() + private val listenerBus = new SparkListenerBus() - var cacheLocs = new HashMap[Int, Array[List[String]]] + // Contains the locations that each RDD's partitions are cached on + private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] - // For tracking failed nodes, we use the MapOutputTracker's generation number, which is - // sent with every task. When we detect a node failing, we note the current generation number - // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask - // results. - // TODO: Garbage collect information about failure generations when we know there are no more + // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with + // every task. When we detect a node failing, we note the current epoch number and failed + // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results. + // + // TODO: Garbage collect information about failure epochs when we know there are no more // stray messages to detect. - val failedGeneration = new HashMap[String, Long] + val failedEpoch = new HashMap[String, Long] val idToActiveJob = new HashMap[Int, ActiveJob] @@ -137,11 +150,17 @@ class DAGScheduler( }.start() } - private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { + def addSparkListener(listener: SparkListener) { + listenerBus.addListener(listener) + } + + private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray - val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env, blockManagerMaster) - cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil)) + val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) + cacheLocs(rdd.id) = blockIds.map { id => + locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) + } } cacheLocs(rdd.id) } @@ -152,14 +171,14 @@ class DAGScheduler( /** * Get or create a shuffle map stage for the given shuffle dependency's map side. - * The priority value passed in will be used if the stage doesn't already exist with - * a lower priority (we assume that priorities always increase across jobs for now). + * The jobId value passed in will be used if the stage doesn't already exist with + * a lower jobId (jobId always increases across jobs.) */ - private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = { + private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => - val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority) + val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } @@ -167,13 +186,13 @@ class DAGScheduler( /** * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or - * as a result stage for the final RDD used directly in an action. The stage will also be given - * the provided priority. + * as a result stage for the final RDD used directly in an action. The stage will also be + * associated with the provided jobId. */ private def newStage( rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], - priority: Int, + jobId: Int, callSite: Option[String] = None) : Stage = { @@ -184,17 +203,17 @@ class DAGScheduler( mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) } val id = nextStageId.getAndIncrement() - val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite) - idToStage(id) = stage + val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) + stageIdToStage(id) = stage stageToInfos(stage) = StageInfo(stage) stage } /** * Get or create the list of parent stages for a given RDD. The stages will be assigned the - * provided priority if they haven't already been created with a lower priority. + * provided jobId if they haven't already been created with a lower jobId. */ - private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = { + private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(r: RDD[_]) { @@ -205,7 +224,7 @@ class DAGScheduler( for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => - parents += getShuffleMapStage(shufDep, priority) + parents += getShuffleMapStage(shufDep, jobId) case _ => visit(dep.rdd) } @@ -226,7 +245,7 @@ class DAGScheduler( for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => - val mapStage = getShuffleMapStage(shufDep, stage.priority) + val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { missing += mapStage } @@ -263,7 +282,7 @@ class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) - return (toSubmit, waiter) + (toSubmit, waiter) } def runJob[T, U: ClassManifest]( @@ -310,8 +329,8 @@ class DAGScheduler( val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.size).toArray - eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties)) - return listener.awaitResult() // Will throw an exception if the job fails + eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties)) + listener.awaitResult() // Will throw an exception if the job fails } /** @@ -321,11 +340,11 @@ class DAGScheduler( private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { event match { case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) => - val runId = nextRunId.getAndIncrement() - val finalStage = newStage(finalRDD, None, runId, Some(callSite)) - val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties) + val jobId = nextJobId.getAndIncrement() + val finalStage = newStage(finalRDD, None, jobId, Some(callSite)) + val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() - logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + + logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) @@ -334,40 +353,40 @@ class DAGScheduler( // Compute very short actions like first() or take() with no parent stages locally. runLocally(job) } else { - sparkListeners.foreach(_.onJobStart(SparkListenerJobStart(job, properties))) - idToActiveJob(runId) = job + listenerBus.post(SparkListenerJobStart(job, properties)) + idToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job submitStage(finalStage) } - case ExecutorGained(execId, hostPort) => - handleExecutorGained(execId, hostPort) + case ExecutorGained(execId, host) => + handleExecutorGained(execId, host) case ExecutorLost(execId) => handleExecutorLost(execId) case begin: BeginEvent => - sparkListeners.foreach(_.onTaskStart(SparkListenerTaskStart(begin.task, begin.taskInfo))) + listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo)) case completion: CompletionEvent => - sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, - completion.reason, completion.taskInfo, completion.taskMetrics))) + listenerBus.post(SparkListenerTaskEnd( + completion.task, completion.reason, completion.taskInfo, completion.taskMetrics)) handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => - abortStage(idToStage(taskSet.stageId), reason) + abortStage(stageIdToStage(taskSet.stageId), reason) case StopDAGScheduler => // Cancel any active jobs for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None)))) + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None))) } return true } - return false + false } /** @@ -379,7 +398,7 @@ class DAGScheduler( clearCacheLocs() val failed2 = failed.toArray failed.clear() - for (stage <- failed2.sortBy(_.priority)) { + for (stage <- failed2.sortBy(_.jobId)) { submitStage(stage) } } @@ -397,7 +416,7 @@ class DAGScheduler( logTrace("failed: " + failed) val waiting2 = waiting.toArray waiting.clear() - for (stage <- waiting2.sortBy(_.priority)) { + for (stage <- waiting2.sortBy(_.jobId)) { submitStage(stage) } } @@ -444,7 +463,7 @@ class DAGScheduler( */ protected def runLocally(job: ActiveJob) { logInfo("Computing the requested partition locally") - new Thread("Local computation of job " + job.runId) { + new Thread("Local computation of job " + job.jobId) { override def run() { runLocallyWithinThread(job) } @@ -504,12 +523,17 @@ class DAGScheduler( } else { // This is a final stage; figure out its job's missing partitions val job = resultStageToJob(stage) - for (id <- 0 until job.numPartitions if (!job.finished(id))) { + for (id <- 0 until job.numPartitions if !job.finished(id)) { val partition = job.partitions(id) val locs = getPreferredLocs(stage.rdd, partition) tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } + // must be run listener before possible NotSerializableException + // should be "StageSubmitted" first and then "JobEnded" + val properties = idToActiveJob(stage.jobId).properties + listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) + if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception @@ -524,13 +548,11 @@ class DAGScheduler( return } - sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size))) logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) - val properties = idToActiveJob(stage.priority).properties taskSched.submitTasks( - new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties)) + new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) if (!stage.submissionTime.isDefined) { stage.submissionTime = Some(System.currentTimeMillis()) } @@ -547,7 +569,7 @@ class DAGScheduler( */ private def handleTaskCompletion(event: CompletionEvent) { val task = event.task - val stage = idToStage(task.stageId) + val stage = stageIdToStage(task.stageId) def markStageAsFinished(stage: Stage) = { val serviceTime = stage.submissionTime match { @@ -556,8 +578,7 @@ class DAGScheduler( } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.completionTime = Some(System.currentTimeMillis) - val stageComp = StageCompleted(stageToInfos(stage)) - sparkListeners.foreach{_.onStageCompleted(stageComp)} + listenerBus.post(StageCompleted(stageToInfos(stage))) running -= stage } event.reason match { @@ -577,11 +598,11 @@ class DAGScheduler( job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { - idToActiveJob -= stage.priority + idToActiveJob -= stage.jobId activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobSucceeded))) + listenerBus.post(SparkListenerJobEnd(job, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -593,7 +614,7 @@ class DAGScheduler( val status = event.result.asInstanceOf[MapStatus] val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) - if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) { + if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) } else { stage.addOutputLoc(smt.partition, status) @@ -605,16 +626,16 @@ class DAGScheduler( logInfo("waiting: " + waiting) logInfo("failed: " + failed) if (stage.shuffleDep != None) { - // We supply true to increment the generation number here in case this is a + // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the - // generation incremented to refetch them. - // TODO: Only increment the generation number if this is not the first time + // epoch incremented to refetch them. + // TODO: Only increment the epoch number if this is not the first time // we registered these map outputs. mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, - true) + changeEpoch = true) } clearCacheLocs() if (stage.outputLocs.count(_ == Nil) != 0) { @@ -648,7 +669,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable - val failedStage = idToStage(task.stageId) + val failedStage = stageIdToStage(task.stageId) running -= failedStage failed += failedStage // TODO: Cancel running tasks in the stage @@ -668,7 +689,7 @@ class DAGScheduler( lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - handleExecutorLost(bmAddress.executorId, Some(task.generation)) + handleExecutorLost(bmAddress.executorId, Some(task.epoch)) } case ExceptionFailure(className, description, stackTrace, metrics) => @@ -676,7 +697,7 @@ class DAGScheduler( case other => // Unrecognized failure - abort all jobs depending on this stage - abortStage(idToStage(task.stageId), task + " failed: " + other) + abortStage(stageIdToStage(task.stageId), task + " failed: " + other) } } @@ -684,36 +705,36 @@ class DAGScheduler( * Responds to an executor being lost. This is called inside the event loop, so it assumes it can * modify the scheduler's internal state. Use executorLost() to post a loss event from outside. * - * Optionally the generation during which the failure was caught can be passed to avoid allowing + * Optionally the epoch during which the failure was caught can be passed to avoid allowing * stray fetch failures from possibly retriggering the detection of a node as lost. */ - private def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) { - val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration) - if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) { - failedGeneration(execId) = currentGeneration - logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration)) + private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) { + val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) + if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { + failedEpoch(execId) = currentEpoch + logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) blockManagerMaster.removeExecutor(execId) // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray - mapOutputTracker.registerMapOutputs(shuffleId, locs, true) + mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) } if (shuffleToMapStage.isEmpty) { - mapOutputTracker.incrementGeneration() + mapOutputTracker.incrementEpoch() } clearCacheLocs() } else { logDebug("Additional executor lost message for " + execId + - "(generation " + currentGeneration + ")") + "(epoch " + currentEpoch + ")") } } - private def handleExecutorGained(execId: String, hostPort: String) { - // remove from failedGeneration(execId) ? - if (failedGeneration.contains(execId)) { - logInfo("Host gained which was in lost list earlier: " + hostPort) - failedGeneration -= execId + private def handleExecutorGained(execId: String, host: String) { + // remove from failedEpoch(execId) ? + if (failedEpoch.contains(execId)) { + logInfo("Host gained which was in lost list earlier: " + host) + failedEpoch -= execId } } @@ -728,8 +749,8 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))) - idToActiveJob -= resultStage.priority + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) + idToActiveJob -= resultStage.jobId activeJobs -= job resultStageToJob -= resultStage } @@ -753,7 +774,7 @@ class DAGScheduler( for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => - val mapStage = getShuffleMapStage(shufDep, stage.priority) + val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { visitedStages += mapStage visit(mapStage.rdd) @@ -768,16 +789,16 @@ class DAGScheduler( visitedRdds.contains(target.rdd) } - private def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { + private def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { // If the partition is cached, return the cache locations val cached = getCacheLocs(rdd)(partition) - if (cached != Nil) { + if (!cached.isEmpty) { return cached } // If the RDD has some placement preferences (as is the case for input RDDs), get those val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList - if (rddPrefs != Nil) { - return rddPrefs + if (!rddPrefs.isEmpty) { + return rddPrefs.map(host => TaskLocation(host)) } // If the RDD has narrow dependencies, pick the first partition of the first narrow dep // that has any placement preferences. Ideally we would choose based on transfer sizes, @@ -791,13 +812,13 @@ class DAGScheduler( } case _ => }) - return Nil + Nil } private def cleanup(cleanupTime: Long) { - var sizeBefore = idToStage.size - idToStage.clearOldValues(cleanupTime) - logInfo("idToStage " + sizeBefore + " --> " + idToStage.size) + var sizeBefore = stageIdToStage.size + stageIdToStage.clearOldValues(cleanupTime) + logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size) sizeBefore = shuffleToMapStage.size shuffleToMapStage.clearOldValues(cleanupTime) |