/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.ui.jobs import java.util.Date import javax.servlet.http.HttpServletRequest import scala.collection.mutable.{Buffer, HashMap, ListBuffer} import scala.xml.{Node, NodeSeq, Unparsed, Utility} import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler.StageInfo import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.ui.jobs.UIData.ExecutorUIData /** Page showing statistics and stage list for a given job */ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { private val STAGES_LEGEND =
Completed Failed Active
.toString.filter(_ != '\n') private val EXECUTORS_LEGEND =
Added Removed
.toString.filter(_ != '\n') private def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = { stageInfos.map { stage => val stageId = stage.stageId val attemptId = stage.attemptId val name = stage.name val status = stage.getStatusString val submissionTime = stage.submissionTime.get val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis()) // The timeline library treats contents as HTML, so we have to escape them; for the // data-title attribute string we have to escape them twice since that's in a string. val escapedName = Utility.escape(name) s""" |{ | 'className': 'stage job-timeline-object ${status}', | 'group': 'stages', | 'start': new Date(${submissionTime}), | 'end': new Date(${completionTime}), | 'content': '
Completed: ${UIUtils.formatDate(new Date(completionTime))}""" } else { "" } }">' + | '${escapedName} (Stage ${stageId}.${attemptId})
', |} """.stripMargin } } def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = { val events = ListBuffer[String]() executorUIDatas.foreach { case (executorId, event) => val addedEvent = s""" |{ | 'className': 'executor added', | 'group': 'executors', | 'start': new Date(${event.startTime}), | 'content': '
Executor ${executorId} added
' |} """.stripMargin events += addedEvent if (event.finishTime.isDefined) { val removedEvent = s""" |{ | 'className': 'executor removed', | 'group': 'executors', | 'start': new Date(${event.finishTime.get}), | 'content': '
Reason: ${event.finishReason.get.replace("\n", " ")}""" } else { "" } }"' + | 'data-html="true">Executor ${executorId} removed
' |} """.stripMargin events += removedEvent } } events.toSeq } private def makeTimeline( stages: Seq[StageInfo], executors: HashMap[String, ExecutorUIData], appStartTime: Long): Seq[Node] = { val stageEventJsonAsStrSeq = makeStageEvent(stages) val executorsJsonAsStrSeq = makeExecutorEvent(executors) val groupJsonArrayAsStr = s""" |[ | { | 'id': 'executors', | 'content': '
Executors
${EXECUTORS_LEGEND}', | }, | { | 'id': 'stages', | 'content': '
Stages
${STAGES_LEGEND}', | } |] """.stripMargin val eventArrayAsStr = (stageEventJsonAsStrSeq ++ executorsJsonAsStrSeq).mkString("[", ",", "]") Event Timeline ++ ++ } def render(request: HttpServletRequest): Seq[Node] = { val listener = parent.jobProgresslistener listener.synchronized { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") val jobId = parameterId.toInt val jobDataOption = listener.jobIdToData.get(jobId) if (jobDataOption.isEmpty) { val content =

No information to display for job {jobId}

return UIUtils.headerSparkPage( s"Details for Job $jobId", content, parent) } val jobData = jobDataOption.get val isComplete = jobData.status != JobExecutionStatus.RUNNING val stages = jobData.stageIds.map { stageId => // This could be empty if the JobProgressListener hasn't received information about the // stage or if the stage information has been garbage collected listener.stageIdToInfo.getOrElse(stageId, new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty)) } val activeStages = Buffer[StageInfo]() val completedStages = Buffer[StageInfo]() // If the job is completed, then any pending stages are displayed as "skipped": val pendingOrSkippedStages = Buffer[StageInfo]() val failedStages = Buffer[StageInfo]() for (stage <- stages) { if (stage.submissionTime.isEmpty) { pendingOrSkippedStages += stage } else if (stage.completionTime.isDefined) { if (stage.failureReason.isDefined) { failedStages += stage } else { completedStages += stage } } else { activeStages += stage } } val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = parent.killEnabled) val pendingOrSkippedStagesTable = new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse, parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false) val completedStagesTable = new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false) val failedStagesTable = new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler) val shouldShowActiveStages = activeStages.nonEmpty val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty val shouldShowCompletedStages = completedStages.nonEmpty val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty val shouldShowFailedStages = failedStages.nonEmpty val summary: NodeSeq =
var content = summary val appStartTime = listener.startTime val executorListener = parent.executorListener val operationGraphListener = parent.operationGraphListener content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, executorListener.executorIdToData, appStartTime) content ++= UIUtils.showDagVizForJob( jobId, operationGraphListener.getOperationGraphForJob(jobId)) if (shouldShowActiveStages) { content ++=

Active Stages ({activeStages.size})

++ activeStagesTable.toNodeSeq } if (shouldShowPendingStages) { content ++=

Pending Stages ({pendingOrSkippedStages.size})

++ pendingOrSkippedStagesTable.toNodeSeq } if (shouldShowCompletedStages) { content ++=

Completed Stages ({completedStages.size})

++ completedStagesTable.toNodeSeq } if (shouldShowSkippedStages) { content ++=

Skipped Stages ({pendingOrSkippedStages.size})

++ pendingOrSkippedStagesTable.toNodeSeq } if (shouldShowFailedStages) { content ++=

Failed Stages ({failedStages.size})

++ failedStagesTable.toNodeSeq } UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true) } } }