/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.{Buffer, HashMap, ListBuffer}
import scala.xml.{Node, NodeSeq, Unparsed, Utility}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.ui.jobs.UIData.ExecutorUIData
/** Page showing statistics and stage list for a given job */
private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
private val STAGES_LEGEND =
.toString.filter(_ != '\n')
private val EXECUTORS_LEGEND =
.toString.filter(_ != '\n')
private def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = {
stageInfos.map { stage =>
val stageId = stage.stageId
val attemptId = stage.attemptId
val name = stage.name
val status = stage.getStatusString
val submissionTime = stage.submissionTime.get
val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis())
// The timeline library treats contents as HTML, so we have to escape them; for the
// data-title attribute string we have to escape them twice since that's in a string.
val escapedName = Utility.escape(name)
s"""
|{
| 'className': 'stage job-timeline-object ${status}',
| 'group': 'stages',
| 'start': new Date(${submissionTime}),
| 'end': new Date(${completionTime}),
| 'content': '
++
}
def render(request: HttpServletRequest): Seq[Node] = {
val listener = parent.jobProgresslistener
listener.synchronized {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
val jobId = parameterId.toInt
val jobDataOption = listener.jobIdToData.get(jobId)
if (jobDataOption.isEmpty) {
val content =
No information to display for job {jobId}
return UIUtils.headerSparkPage(
s"Details for Job $jobId", content, parent)
}
val jobData = jobDataOption.get
val isComplete = jobData.status != JobExecutionStatus.RUNNING
val stages = jobData.stageIds.map { stageId =>
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty))
}
val activeStages = Buffer[StageInfo]()
val completedStages = Buffer[StageInfo]()
// If the job is completed, then any pending stages are displayed as "skipped":
val pendingOrSkippedStages = Buffer[StageInfo]()
val failedStages = Buffer[StageInfo]()
for (stage <- stages) {
if (stage.submissionTime.isEmpty) {
pendingOrSkippedStages += stage
} else if (stage.completionTime.isDefined) {
if (stage.failureReason.isDefined) {
failedStages += stage
} else {
completedStages += stage
}
} else {
activeStages += stage
}
}
val activeStagesTable =
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler,
killEnabled = parent.killEnabled)
val pendingOrSkippedStagesTable =
new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse,
parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler,
killEnabled = false)
val completedStagesTable =
new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
val failedStagesTable =
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler)
val shouldShowActiveStages = activeStages.nonEmpty
val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
val shouldShowCompletedStages = completedStages.nonEmpty
val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty
val shouldShowFailedStages = failedStages.nonEmpty
val summary: NodeSeq =