diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/Stage.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/IndexPage.scala | 2 |
3 files changed, 18 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 8173ef709d..64ed91f5a0 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -150,7 +150,13 @@ class DAGScheduler( * as a result stage for the final RDD used directly in an action. The stage will also be given * the provided priority. */ - private def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = { + private def newStage( + rdd: RDD[_], + shuffleDep: Option[ShuffleDependency[_,_]], + priority: Int, + callSite: Option[String] = None) + : Stage = + { if (shuffleDep != None) { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown @@ -158,7 +164,7 @@ class DAGScheduler( mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) } val id = nextStageId.getAndIncrement() - val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority) + val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite) idToStage(id) = stage stageToInfos(stage) = StageInfo(stage) stage @@ -295,12 +301,12 @@ class DAGScheduler( event match { case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) => val runId = nextRunId.getAndIncrement() - val finalStage = newStage(finalRDD, None, runId) + val finalStage = newStage(finalRDD, None, runId, Some(callSite)) val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") - logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") + logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { @@ -511,7 +517,7 @@ class DAGScheduler( case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) case _ => "Unkown" } - logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime)) + logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.completionTime = Some(System.currentTimeMillis) val stageComp = StageCompleted(stageToInfos(stage)) sparkListeners.foreach{_.onStageCompleted(stageComp)} @@ -577,7 +583,7 @@ class DAGScheduler( if (stage.outputLocs.count(_ == Nil) != 0) { // Some tasks had failed; let's resubmit this stage // TODO: Lower-level scheduler should also deal with this - logInfo("Resubmitting " + stage + " (" + stage.origin + + logInfo("Resubmitting " + stage + " (" + stage.name + ") because some of its tasks had failed: " + stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", ")) submitStage(stage) @@ -609,7 +615,7 @@ class DAGScheduler( running -= failedStage failed += failedStage // TODO: Cancel running tasks in the stage - logInfo("Marking " + failedStage + " (" + failedStage.origin + + logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") // Mark the map whose fetch failed as broken in the map stage val mapStage = shuffleToMapStage(shuffleId) @@ -617,7 +623,7 @@ class DAGScheduler( mapStage.removeOutputLoc(mapId, bmAddress) mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) } - logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin + + logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + "); marking it for resubmission") failed += mapStage // Remember that a fetch failed now; this is used to resubmit the broken diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 539cf8233b..4937eb3b88 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -24,7 +24,8 @@ private[spark] class Stage( val rdd: RDD[_], val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage val parents: List[Stage], - val priority: Int) + val priority: Int, + callSite: Option[String]) extends Logging { val isShuffleMap = shuffleDep != None @@ -85,7 +86,7 @@ private[spark] class Stage( return id } - def origin: String = rdd.origin + val name = callSite.getOrElse(rdd.origin) override def toString = "Stage " + id diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 7907ab3bc7..2df5f0192b 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -89,7 +89,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { <tr> <td>{s.id}</td> - <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td> + <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td> <td>{submissionTime}</td> <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td> |