aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2015-01-19 01:28:42 -0800
committerPatrick Wendell <patrick@databricks.com>2015-01-19 01:28:42 -0800
commit851b6a9bbadfcd4d258054586a9a157c41ce71c7 (patch)
treec70f49efd3a502385267678ff9bd58c30a642dcd /core
parent7dbf1fdb811cfeb04bd2d1e684271e2f53247a02 (diff)
downloadspark-851b6a9bbadfcd4d258054586a9a157c41ce71c7.tar.gz
spark-851b6a9bbadfcd4d258054586a9a157c41ce71c7.tar.bz2
spark-851b6a9bbadfcd4d258054586a9a157c41ce71c7.zip
SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage.
![screenshot from 2015-01-16 13 43 25](https://cloud.githubusercontent.com/assets/992952/5773256/d61df300-9d85-11e4-9b5a-6730058839fa.png) This is a first step towards having time remaining estimates for queued and running jobs. See SPARK-5216 Author: Prashant Sharma <prashant.s@imaginea.com> Closes #4043 from ScrapCodes/SPARK-5216/5217-show-waiting-stages and squashes the following commits: 3b11803 [Prashant Sharma] Review feedback. 0992842 [Prashant Sharma] Switched to Linked hashmap, changed the order to active->pending->completed->failed. And changed pending stages to not reverse sort. c19d82a [Prashant Sharma] SPARK-5217 Spark UI should report pending stages during job execution on AllStagesPage.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala5
2 files changed, 15 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index b0f8ca2ab0..1da7a98820 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -33,6 +33,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val activeStages = listener.activeStages.values.toSeq
+ val pendingStages = listener.pendingStages.values.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val numCompletedStages = listener.numCompletedStages
val failedStages = listener.failedStages.reverse.toSeq
@@ -43,6 +44,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
killEnabled = parent.killEnabled)
+ val pendingStagesTable =
+ new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
+ parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+ killEnabled = false)
val completedStagesTable =
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
@@ -73,6 +78,10 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
{activeStages.size}
</li>
<li>
+ <a href="#pending"><strong>Pending Stages:</strong></a>
+ {pendingStages.size}
+ </li>
+ <li>
<a href="#completed"><strong>Completed Stages:</strong></a>
{numCompletedStages}
</li>
@@ -91,6 +100,8 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
}} ++
<h4 id="active">Active Stages ({activeStages.size})</h4> ++
activeStagesTable.toNodeSeq ++
+ <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++
+ pendingStagesTable.toNodeSeq ++
<h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++
completedStagesTable.toNodeSeq ++
<h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index b0d3bed130..4d200eeda8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -56,6 +56,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val jobIdToData = new HashMap[JobId, JobUIData]
// Stages:
+ val pendingStages = new HashMap[StageId, StageInfo]
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val skippedStages = ListBuffer[StageInfo]()
@@ -157,6 +158,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
+ jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
// stages' transitive stage dependencies, but some of these stages might be skipped if their
@@ -187,6 +189,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
+ jobData.stageIds.foreach(pendingStages.remove)
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
@@ -257,7 +260,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
val stage = stageSubmitted.stageInfo
activeStages(stage.stageId) = stage
-
+ pendingStages.remove(stage.stageId)
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)