aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTao Lin <nblintao@gmail.com>2016-07-06 10:28:05 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-07-06 10:28:05 -0700
commit478b71d028107d42fbb6d1bd300b86efbe0dcf7d (patch)
tree50472dc53ca9944718c7f1fb7f99cbc94fca65c1 /core
parent21eadd1d8cbf029197e73ffca1cba54d5a890c01 (diff)
downloadspark-478b71d028107d42fbb6d1bd300b86efbe0dcf7d.tar.gz
spark-478b71d028107d42fbb6d1bd300b86efbe0dcf7d.tar.bz2
spark-478b71d028107d42fbb6d1bd300b86efbe0dcf7d.zip
[SPARK-15591][WEBUI] Paginate Stage Table in Stages tab
## What changes were proposed in this pull request? This patch adds pagination support for the Stage Tables in the Stage tab. Pagination is provided for all of the four Job Tables (active, pending, completed, and failed). Besides, the paged stage tables are also used in JobPage (the detail page for one job) and PoolPage. Interactions (jumping, sorting, and setting page size) for paged tables are also included. ## How was this patch tested? Tested manually by using checking the Web UI after completing and failing hundreds of jobs. Same as the testings for [Paginate Job Table in Jobs tab](https://github.com/apache/spark/pull/13620). This shows the pagination for completed stages: ![paged stage table](https://cloud.githubusercontent.com/assets/5558370/16125696/5804e35e-3427-11e6-8923-5c5948982648.png) Author: Tao Lin <nblintao@gmail.com> Closes #13708 from nblintao/stageTable.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/PagedTable.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala517
5 files changed, 441 insertions, 141 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 9b6ed8cbbe..2a7c16b04b 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] {
Splitter
.on('&')
.trimResults()
+ .omitEmptyStrings()
.withKeyValueSeparator("=")
.split(querystring)
.asScala
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index e75f1c57a6..cba8f82dd7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -38,22 +38,24 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val numCompletedStages = listener.numCompletedStages
val failedStages = listener.failedStages.reverse.toSeq
val numFailedStages = listener.numFailedStages
- val now = System.currentTimeMillis
+ val subPath = "stages"
val activeStagesTable =
- new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
- killEnabled = parent.killEnabled)
+ new StageTableBase(request, activeStages, "activeStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = parent.killEnabled, isFailedStage = false)
val pendingStagesTable =
- new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
- killEnabled = false)
+ new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val completedStagesTable =
- new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
+ new StageTableBase(request, completedStages, "completedStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val failedStagesTable =
- new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.progressListener, isFairScheduler = parent.isFairScheduler)
+ new StageTableBase(request, failedStages, "failedStage", parent.basePath, subPath,
+ parent.progressListener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = true)
// For now, pool information is only accessible in live UIs
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
@@ -136,3 +138,4 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
}
}
}
+
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 99f2bd8bc1..0ec42d68d3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -229,20 +229,24 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
}
}
+ val basePath = "jobs/job"
+
val activeStagesTable =
- new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler,
- killEnabled = parent.killEnabled)
+ new StageTableBase(request, activeStages, "activeStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = parent.killEnabled, isFailedStage = false)
val pendingOrSkippedStagesTable =
- new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse,
- parent.basePath, parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler,
- killEnabled = false)
+ new StageTableBase(request, pendingOrSkippedStages, "pendingStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val completedStagesTable =
- new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
+ new StageTableBase(request, completedStages, "completedStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = false)
val failedStagesTable =
- new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
- parent.jobProgresslistener, isFairScheduler = parent.isFairScheduler)
+ new StageTableBase(request, failedStages, "failedStage", parent.basePath,
+ basePath, parent.jobProgresslistener, parent.isFairScheduler,
+ killEnabled = false, isFailedStage = true)
val shouldShowActiveStages = activeStages.nonEmpty
val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 6cd25919ca..f9cb717918 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -42,9 +42,11 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
case Some(s) => s.values.toSeq
case None => Seq[StageInfo]()
}
- val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
- parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler,
- killEnabled = parent.killEnabled)
+ val shouldShowActiveStages = activeStages.nonEmpty
+ val activeStagesTable =
+ new StageTableBase(request, activeStages, "activeStage", parent.basePath, "stages/pool",
+ parent.progressListener, parent.isFairScheduler, parent.killEnabled,
+ isFailedStage = false)
// For now, pool information is only accessible in live UIs
val pools = sc.map(_.getPoolForName(poolName).getOrElse {
@@ -52,9 +54,10 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
}).toSeq
val poolTable = new PoolTable(pools, parent)
- val content =
- <h4>Summary </h4> ++ poolTable.toNodeSeq ++
- <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq
+ var content = <h4>Summary </h4> ++ poolTable.toNodeSeq
+ if (shouldShowActiveStages) {
+ content ++= <h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq
+ }
UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 0e020155a6..2a04e8fc7d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,61 +17,326 @@
package org.apache.spark.ui.jobs
+import java.net.URLEncoder
import java.util.Date
+import javax.servlet.http.HttpServletRequest
-import scala.xml.{Node, Text}
+import scala.collection.JavaConverters._
+import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.ui._
+import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils
-/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTableBase(
+ request: HttpServletRequest,
+ stages: Seq[StageInfo],
+ stageTag: String,
+ basePath: String,
+ subPath: String,
+ progressListener: JobProgressListener,
+ isFairScheduler: Boolean,
+ killEnabled: Boolean,
+ isFailedStage: Boolean) {
+ val allParameters = request.getParameterMap().asScala.toMap
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag))
+ .map(para => para._1 + "=" + para._2(0))
+
+ val parameterStagePage = request.getParameter(stageTag + ".page")
+ val parameterStageSortColumn = request.getParameter(stageTag + ".sort")
+ val parameterStageSortDesc = request.getParameter(stageTag + ".desc")
+ val parameterStagePageSize = request.getParameter(stageTag + ".pageSize")
+ val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize")
+
+ val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1)
+ val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse("Stage Id")
+ val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse(
+ // New stages should be shown above old jobs by default.
+ if (stageSortColumn == "Stage Id") true else false
+ )
+ val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100)
+ val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt)
+ .getOrElse(stagePageSize)
+
+ val page: Int = {
+ // If the user has changed to a larger page size, then go to page 1 in order to avoid
+ // IndexOutOfBoundsException.
+ if (stagePageSize <= stagePrevPageSize) {
+ stagePage
+ } else {
+ 1
+ }
+ }
+ val currentTime = System.currentTimeMillis()
+
+ val toNodeSeq = try {
+ new StagePagedTable(
+ stages,
+ stageTag,
+ basePath,
+ subPath,
+ progressListener,
+ isFairScheduler,
+ killEnabled,
+ currentTime,
+ stagePageSize,
+ stageSortColumn,
+ stageSortDesc,
+ isFailedStage,
+ parameterOtherTable
+ ).table(page)
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering stage table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+}
+
+private[ui] class StageTableRowData(
+ val stageInfo: StageInfo,
+ val stageData: Option[StageUIData],
+ val stageId: Int,
+ val attemptId: Int,
+ val schedulingPool: String,
+ val description: String,
+ val descriptionOption: Option[String],
+ val submissionTime: Long,
+ val formattedSubmissionTime: String,
+ val duration: Long,
+ val formattedDuration: String,
+ val inputRead: Long,
+ val inputReadWithUnit: String,
+ val outputWrite: Long,
+ val outputWriteWithUnit: String,
+ val shuffleRead: Long,
+ val shuffleReadWithUnit: String,
+ val shuffleWrite: Long,
+ val shuffleWriteWithUnit: String)
+
+private[ui] class MissingStageTableRowData(
+ stageInfo: StageInfo,
+ stageId: Int,
+ attemptId: Int) extends StageTableRowData(
+ stageInfo, None, stageId, attemptId, "", "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "")
+
+/** Page showing list of all ongoing and recently finished stages */
+private[ui] class StagePagedTable(
stages: Seq[StageInfo],
+ stageTag: String,
basePath: String,
+ subPath: String,
listener: JobProgressListener,
isFairScheduler: Boolean,
- killEnabled: Boolean) {
-
- protected def columns: Seq[Node] = {
- <th>Stage Id</th> ++
- {if (isFairScheduler) {<th>Pool Name</th>} else Seq.empty} ++
- <th>Description</th>
- <th>Submitted</th>
- <th>Duration</th>
- <th>Tasks: Succeeded/Total</th>
- <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th>
- <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th>
- <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th>
- <th>
- <!-- Place the shuffle write tooltip on the left (rather than the default position
- of on top) because the shuffle write column is the last column on the right side and
- the tooltip is wider than the column, so it doesn't fit on top. -->
- <span data-toggle="tooltip" data-placement="left" title={ToolTips.SHUFFLE_WRITE}>
- Shuffle Write
- </span>
- </th>
+ killEnabled: Boolean,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean,
+ isFailedStage: Boolean,
+ parameterOtherTable: Iterable[String]) extends PagedTable[StageTableRowData] {
+
+ override def tableId: String = stageTag + "-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped table-head-clickable"
+
+ override def pageSizeFormField: String = stageTag + ".pageSize"
+
+ override def prevPageSizeFormField: String = stageTag + ".prevPageSize"
+
+ override def pageNumberFormField: String = stageTag + ".page"
+
+ val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+ parameterOtherTable.mkString("&")
+
+ override val dataSource = new StageDataSource(
+ stages,
+ listener,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc
+ )
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$stageTag.sort=$encodedSortColumn" +
+ s"&$stageTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc"
}
- def toNodeSeq: Seq[Node] = {
- listener.synchronized {
- stageTable(renderStageRow, stages)
+ override def headers: Seq[Node] = {
+ // stageHeadersAndCssClasses has three parts: header title, tooltip information, and sortable.
+ // The tooltip information could be None, which indicates it does not have a tooltip.
+ // Otherwise, it has two parts: tooltip text, and position (true for left, false for default).
+ val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] =
+ Seq(("Stage Id", None, true)) ++
+ {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++
+ Seq(
+ ("Description", None, true), ("Submitted", None, true), ("Duration", None, true),
+ ("Tasks: Succeeded/Total", None, false),
+ ("Input", Some((ToolTips.INPUT, false)), true),
+ ("Output", Some((ToolTips.OUTPUT, false)), true),
+ ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true),
+ ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true)
+ ) ++
+ {if (isFailedStage) {Seq(("Failure Reason", None, false))} else Seq.empty}
+
+ if (!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
}
+
+ val headerRow: Seq[Node] = {
+ stageHeadersAndCssClasses.map { case (header, tooltip, sortable) =>
+ val headerSpan = tooltip.map { case (title, left) =>
+ if (left) {
+ /* Place the shuffle write tooltip on the left (rather than the default position
+ of on top) because the shuffle write column is the last column on the right side and
+ the tooltip is wider than the column, so it doesn't fit on top. */
+ <span data-toggle="tooltip" data-placement="left" title={title}>
+ {header}
+ </span>
+ } else {
+ <span data-toggle="tooltip" title={title}>
+ {header}
+ </span>
+ }
+ }.getOrElse(
+ {header}
+ )
+
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$stageTag.desc=${!desc}" +
+ s"&$stageTag.pageSize=$pageSize")
+ val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+
+ <th>
+ <a href={headerLink}>
+ {headerSpan}<span>
+ &nbsp;{Unparsed(arrow)}
+ </span>
+ </a>
+ </th>
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$stageTag.pageSize=$pageSize")
+
+ <th>
+ <a href={headerLink}>
+ {headerSpan}
+ </a>
+ </th>
+ } else {
+ <th>
+ {headerSpan}
+ </th>
+ }
+ }
+ }
+ }
+ <thead>{headerRow}</thead>
+ }
+
+ override def row(data: StageTableRowData): Seq[Node] = {
+ <tr id={"stage-" + data.stageId + "-" + data.attemptId}>
+ {rowContent(data)}
+ </tr>
}
- /** Special table that merges two header cells. */
- protected def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>{columns}</thead>
- <tbody>
- {rows.map(r => makeRow(r))}
- </tbody>
- </table>
+ private def rowContent(data: StageTableRowData): Seq[Node] = {
+ data.stageData match {
+ case None => missingStageRow(data.stageId)
+ case Some(stageData) =>
+ val info = data.stageInfo
+
+ {if (data.attemptId > 0) {
+ <td>{data.stageId} (retry {data.attemptId})</td>
+ } else {
+ <td>{data.stageId}</td>
+ }} ++
+ {if (isFairScheduler) {
+ <td>
+ <a href={"%s/stages/pool?poolname=%s"
+ .format(UIUtils.prependBaseUri(basePath), data.schedulingPool)}>
+ {data.schedulingPool}
+ </a>
+ </td>
+ } else {
+ Seq.empty
+ }} ++
+ <td>{makeDescription(info, data.descriptionOption)}</td>
+ <td valign="middle">
+ {data.formattedSubmissionTime}
+ </td>
+ <td>{data.formattedDuration}</td>
+ <td class="progress-cell">
+ {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
+ completed = stageData.completedIndices.size, failed = stageData.numFailedTasks,
+ skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)}
+ </td>
+ <td>{data.inputReadWithUnit}</td>
+ <td>{data.outputWriteWithUnit}</td>
+ <td>{data.shuffleReadWithUnit}</td>
+ <td>{data.shuffleWriteWithUnit}</td> ++
+ {
+ if (isFailedStage) {
+ failureReasonHtml(info)
+ } else {
+ Seq.empty
+ }
+ }
+ }
}
- private def makeDescription(s: StageInfo): Seq[Node] = {
+ private def failureReasonHtml(s: StageInfo): Seq[Node] = {
+ val failureReason = s.failureReason.getOrElse("")
+ val isMultiline = failureReason.indexOf('\n') >= 0
+ // Display the first line by default
+ val failureReasonSummary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) {
+ failureReason.substring(0, failureReason.indexOf('\n'))
+ } else {
+ failureReason
+ })
+ val details = if (isMultiline) {
+ // scalastyle:off
+ <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span> ++
+ <div class="stacktrace-details collapsed">
+ <pre>{failureReason}</pre>
+ </div>
+ // scalastyle:on
+ } else {
+ ""
+ }
+ <td valign="middle">{failureReasonSummary}{details}</td>
+ }
+
+ private def makeDescription(s: StageInfo, descriptionOption: Option[String]): Seq[Node] = {
val basePathUri = UIUtils.prependBaseUri(basePath)
val killLink = if (killEnabled) {
@@ -111,12 +376,7 @@ private[ui] class StageTableBase(
</div>
}
- val stageDesc = for {
- stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
- desc <- stageData.description
- } yield {
- UIUtils.makeDescription(desc, basePathUri)
- }
+ val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, basePathUri))
<div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
}
@@ -132,19 +392,44 @@ private[ui] class StageTableBase(
<td></td> ++ // Shuffle Read
<td></td> // Shuffle Write
}
+}
+
+private[ui] class StageDataSource(
+ stages: Seq[StageInfo],
+ listener: JobProgressListener,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) {
+ // Convert StageInfo to StageTableRowData which contains the final contents to show in the table
+ // so that we can avoid creating duplicate contents during sorting the data
+ private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc))
+
+ private var _slicedStageIds: Set[Int] = null
- protected def stageRow(s: StageInfo): Seq[Node] = {
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = {
+ val r = data.slice(from, to)
+ _slicedStageIds = r.map(_.stageId).toSet
+ r
+ }
+
+ private def stageRow(s: StageInfo): StageTableRowData = {
val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
+
if (stageDataOption.isEmpty) {
- return missingStageRow(s.stageId)
+ return new MissingStageTableRowData(s, s.stageId, s.attemptId)
}
-
val stageData = stageDataOption.get
- val submissionTime = s.submissionTime match {
+
+ val description = stageData.description
+
+ val formattedSubmissionTime = s.submissionTime match {
case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
}
- val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
+ val finishTime = s.completionTime.getOrElse(currentTime)
// The submission time for a stage is misleading because it counts the time
// the stage waits to be launched. (SPARK-10930)
@@ -156,7 +441,7 @@ private[ui] class StageTableBase(
if (finishTime > startTime) {
Some(finishTime - startTime)
} else {
- Some(System.currentTimeMillis() - startTime)
+ Some(currentTime - startTime)
}
} else {
None
@@ -172,76 +457,80 @@ private[ui] class StageTableBase(
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
- {if (s.attemptId > 0) {
- <td>{s.stageId} (retry {s.attemptId})</td>
- } else {
- <td>{s.stageId}</td>
- }} ++
- {if (isFairScheduler) {
- <td>
- <a href={"%s/stages/pool?poolname=%s"
- .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}>
- {stageData.schedulingPool}
- </a>
- </td>
- } else {
- Seq.empty
- }} ++
- <td>{makeDescription(s)}</td>
- <td sorttable_customkey={s.submissionTime.getOrElse(0).toString} valign="middle">
- {submissionTime}
- </td>
- <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
- <td class="progress-cell">
- {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
- completed = stageData.completedIndices.size, failed = stageData.numFailedTasks,
- skipped = 0, killed = stageData.numKilledTasks, total = s.numTasks)}
- </td>
- <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td>
- <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td>
- <td sorttable_customkey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
- <td sorttable_customkey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
- }
- /** Render an HTML row that represents a stage */
- private def renderStageRow(s: StageInfo): Seq[Node] =
- <tr id={"stage-" + s.stageId + "-" + s.attemptId}>{stageRow(s)}</tr>
-}
-
-private[ui] class FailedStageTable(
- stages: Seq[StageInfo],
- basePath: String,
- listener: JobProgressListener,
- isFairScheduler: Boolean)
- extends StageTableBase(stages, basePath, listener, isFairScheduler, killEnabled = false) {
-
- override protected def columns: Seq[Node] = super.columns ++ <th>Failure Reason</th>
+ new StageTableRowData(
+ s,
+ stageDataOption,
+ s.stageId,
+ s.attemptId,
+ stageData.schedulingPool,
+ description.getOrElse(""),
+ description,
+ s.submissionTime.getOrElse(0),
+ formattedSubmissionTime,
+ duration.getOrElse(-1),
+ formattedDuration,
+ inputRead,
+ inputReadWithUnit,
+ outputWrite,
+ outputWriteWithUnit,
+ shuffleRead,
+ shuffleReadWithUnit,
+ shuffleWrite,
+ shuffleWriteWithUnit
+ )
+ }
- override protected def stageRow(s: StageInfo): Seq[Node] = {
- val basicColumns = super.stageRow(s)
- val failureReason = s.failureReason.getOrElse("")
- val isMultiline = failureReason.indexOf('\n') >= 0
- // Display the first line by default
- val failureReasonSummary = StringEscapeUtils.escapeHtml4(
- if (isMultiline) {
- failureReason.substring(0, failureReason.indexOf('\n'))
- } else {
- failureReason
- })
- val details = if (isMultiline) {
- // scalastyle:off
- <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
- class="expand-details">
- +details
- </span> ++
- <div class="stacktrace-details collapsed">
- <pre>{failureReason}</pre>
- </div>
- // scalastyle:on
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = {
+ val ordering = sortColumn match {
+ case "Stage Id" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Int.compare(x.stageId, y.stageId)
+ }
+ case "Pool Name" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.String.compare(x.schedulingPool, y.schedulingPool)
+ }
+ case "Description" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.String.compare(x.description, y.description)
+ }
+ case "Submitted" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.submissionTime, y.submissionTime)
+ }
+ case "Duration" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Input" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.inputRead, y.inputRead)
+ }
+ case "Output" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.outputWrite, y.outputWrite)
+ }
+ case "Shuffle Read" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.shuffleRead, y.shuffleRead)
+ }
+ case "Shuffle Write" => new Ordering[StageTableRowData] {
+ override def compare(x: StageTableRowData, y: StageTableRowData): Int =
+ Ordering.Long.compare(x.shuffleWrite, y.shuffleWrite)
+ }
+ case "Tasks: Succeeded/Total" =>
+ throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
} else {
- ""
+ ordering
}
- val failureReasonHtml = <td valign="middle">{failureReasonSummary}{details}</td>
- basicColumns ++ failureReasonHtml
}
}
+