From 851b6a9bbadfcd4d258054586a9a157c41ce71c7 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 19 Jan 2015 01:28:42 -0800 Subject: SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage. ![screenshot from 2015-01-16 13 43 25](https://cloud.githubusercontent.com/assets/992952/5773256/d61df300-9d85-11e4-9b5a-6730058839fa.png) This is a first step towards having time remaining estimates for queued and running jobs. See SPARK-5216 Author: Prashant Sharma Closes #4043 from ScrapCodes/SPARK-5216/5217-show-waiting-stages and squashes the following commits: 3b11803 [Prashant Sharma] Review feedback. 0992842 [Prashant Sharma] Switched to Linked hashmap, changed the order to active->pending->completed->failed. And changed pending stages to not reverse sort. c19d82a [Prashant Sharma] SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage. --- .../main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala | 11 +++++++++++ .../scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 5 ++++- 2 files changed, 15 insertions(+), 1 deletion(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index b0f8ca2ab0..1da7a98820 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -33,6 +33,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq + val pendingStages = listener.pendingStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq @@ -43,6 +44,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) + val pendingStagesTable = + new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = false) val completedStagesTable = new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) @@ -72,6 +77,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Active Stages: {activeStages.size} +
  • + Pending Stages: + {pendingStages.size} +
  • Completed Stages: {numCompletedStages} @@ -91,6 +100,8 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { }} ++

    Active Stages ({activeStages.size})

    ++ activeStagesTable.toNodeSeq ++ +

    Pending Stages ({pendingStages.size})

    ++ + pendingStagesTable.toNodeSeq ++

    Completed Stages ({numCompletedStages})

    ++ completedStagesTable.toNodeSeq ++

    Failed Stages ({numFailedStages})

    ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b0d3bed130..4d200eeda8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -56,6 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val jobIdToData = new HashMap[JobId, JobUIData] // Stages: + val pendingStages = new HashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]() @@ -157,6 +158,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageIds = jobStart.stageIds, jobGroup = jobGroup, status = JobExecutionStatus.RUNNING) + jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) // Compute (a potential underestimate of) the number of tasks that will be run by this job. // This may be an underestimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their @@ -187,6 +189,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) + jobData.stageIds.foreach(pendingStages.remove) jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData @@ -257,7 +260,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage - + pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) -- cgit v1.2.3