diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2015-01-19 01:28:42 -0800 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-01-19 01:28:42 -0800 |
commit | 851b6a9bbadfcd4d258054586a9a157c41ce71c7 (patch) | |
tree | c70f49efd3a502385267678ff9bd58c30a642dcd /core/src | |
parent | 7dbf1fdb811cfeb04bd2d1e684271e2f53247a02 (diff) | |
download | spark-851b6a9bbadfcd4d258054586a9a157c41ce71c7.tar.gz spark-851b6a9bbadfcd4d258054586a9a157c41ce71c7.tar.bz2 spark-851b6a9bbadfcd4d258054586a9a157c41ce71c7.zip |
SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage.
![screenshot from 2015-01-16 13 43 25](https://cloud.githubusercontent.com/assets/992952/5773256/d61df300-9d85-11e4-9b5a-6730058839fa.png)
This is a first step towards having time remaining estimates for queued and running jobs. See SPARK-5216
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes #4043 from ScrapCodes/SPARK-5216/5217-show-waiting-stages and squashes the following commits:
3b11803 [Prashant Sharma] Review feedback.
0992842 [Prashant Sharma] Switched to Linked hashmap, changed the order to active->pending->completed->failed. And changed pending stages to not reverse sort.
c19d82a [Prashant Sharma] SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala | 11 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 5 |
2 files changed, 15 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index b0f8ca2ab0..1da7a98820 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -33,6 +33,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq + val pendingStages = listener.pendingStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq @@ -43,6 +44,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) + val pendingStagesTable = + new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, + parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, + killEnabled = false) val completedStagesTable = new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false) @@ -73,6 +78,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { {activeStages.size} </li> <li> + <a href="#pending"><strong>Pending Stages:</strong></a> + {pendingStages.size} + </li> + <li> <a href="#completed"><strong>Completed Stages:</strong></a> {numCompletedStages} </li> @@ -91,6 +100,8 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { }} ++ <h4 id="active">Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq ++ + <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++ + pendingStagesTable.toNodeSeq ++ <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++ completedStagesTable.toNodeSeq ++ <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b0d3bed130..4d200eeda8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -56,6 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val jobIdToData = new HashMap[JobId, JobUIData] // Stages: + val pendingStages = new HashMap[StageId, StageInfo] val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]() @@ -157,6 +158,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageIds = jobStart.stageIds, jobGroup = jobGroup, status = JobExecutionStatus.RUNNING) + jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x) // Compute (a potential underestimate of) the number of tasks that will be run by this job. // This may be an underestimate because the job start event references all of the result // stages' transitive stage dependencies, but some of these stages might be skipped if their @@ -187,6 +189,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } jobData.completionTime = Option(jobEnd.time).filter(_ >= 0) + jobData.stageIds.foreach(pendingStages.remove) jobEnd.jobResult match { case JobSucceeded => completedJobs += jobData @@ -257,7 +260,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized { val stage = stageSubmitted.stageInfo activeStages(stage.stageId) = stage - + pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) |