aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js34
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js27
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/timeline-view.js39
-rw-r--r--core/src/main/scala/org/apache/spark/ui/PagedTable.scala246
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala879
-rw-r--r--core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala99
6 files changed, 1102 insertions, 222 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js
index 0b450dc76b..3c8ddddf07 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js
@@ -19,6 +19,9 @@
* to be registered after the page loads. */
$(function() {
$("span.expand-additional-metrics").click(function(){
+ var status = window.localStorage.getItem("expand-additional-metrics") == "true";
+ status = !status;
+
// Expand the list of additional metrics.
var additionalMetricsDiv = $(this).parent().find('.additional-metrics');
$(additionalMetricsDiv).toggleClass('collapsed');
@@ -26,17 +29,31 @@ $(function() {
// Switch the class of the arrow from open to closed.
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open');
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');
+
+ window.localStorage.setItem("expand-additional-metrics", "" + status);
});
+ if (window.localStorage.getItem("expand-additional-metrics") == "true") {
+ // Set it to false so that the click function can revert it
+ window.localStorage.setItem("expand-additional-metrics", "false");
+ $("span.expand-additional-metrics").trigger("click");
+ }
+
stripeSummaryTable();
$('input[type="checkbox"]').click(function() {
- var column = "table ." + $(this).attr("name");
+ var name = $(this).attr("name")
+ var column = "table ." + name;
+ var status = window.localStorage.getItem(name) == "true";
+ status = !status;
$(column).toggle();
stripeSummaryTable();
+ window.localStorage.setItem(name, "" + status);
});
$("#select-all-metrics").click(function() {
+ var status = window.localStorage.getItem("select-all-metrics") == "true";
+ status = !status;
if (this.checked) {
// Toggle all un-checked options.
$('input[type="checkbox"]:not(:checked)').trigger('click');
@@ -44,6 +61,21 @@ $(function() {
// Toggle all checked options.
$('input[type="checkbox"]:checked').trigger('click');
}
+ window.localStorage.setItem("select-all-metrics", "" + status);
+ });
+
+ if (window.localStorage.getItem("select-all-metrics") == "true") {
+ $("#select-all-metrics").attr('checked', status);
+ }
+
+ $("span.additional-metric-title").parent().find('input[type="checkbox"]').each(function() {
+ var name = $(this).attr("name")
+ // If name is undefined, then skip it because it's the "select-all-metrics" checkbox
+ if (name && window.localStorage.getItem(name) == "true") {
+ // Set it to false so that the click function can revert it
+ window.localStorage.setItem(name, "false");
+ $(this).trigger("click")
+ }
});
// Trigger a click on the checkbox if a user clicks the label next to it.
diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
index 9fa53baaf4..4a893bc018 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
@@ -73,12 +73,23 @@ var StagePageVizConstants = {
};
/*
+ * Return "expand-dag-viz-arrow-job" if forJob is true.
+ * Otherwise, return "expand-dag-viz-arrow-stage".
+ */
+function expandDagVizArrowKey(forJob) {
+ return forJob ? "expand-dag-viz-arrow-job" : "expand-dag-viz-arrow-stage";
+}
+
+/*
* Show or hide the RDD DAG visualization.
*
* The graph is only rendered the first time this is called.
* This is the narrow interface called from the Scala UI code.
*/
function toggleDagViz(forJob) {
+ var status = window.localStorage.getItem(expandDagVizArrowKey(forJob)) == "true";
+ status = !status;
+
var arrowSelector = ".expand-dag-viz-arrow";
$(arrowSelector).toggleClass('arrow-closed');
$(arrowSelector).toggleClass('arrow-open');
@@ -93,8 +104,24 @@ function toggleDagViz(forJob) {
// Save the graph for later so we don't have to render it again
graphContainer().style("display", "none");
}
+
+ window.localStorage.setItem(expandDagVizArrowKey(forJob), "" + status);
}
+$(function (){
+ if (window.localStorage.getItem(expandDagVizArrowKey(false)) == "true") {
+ // Set it to false so that the click function can revert it
+ window.localStorage.setItem(expandDagVizArrowKey(false), "false");
+ toggleDagViz(false);
+ }
+
+ if (window.localStorage.getItem(expandDagVizArrowKey(true)) == "true") {
+ // Set it to false so that the click function can revert it
+ window.localStorage.setItem(expandDagVizArrowKey(true), "false");
+ toggleDagViz(true);
+ }
+});
+
/*
* Render the RDD DAG visualization.
*
diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
index ca74ef9d7e..f4453c71df 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
@@ -66,14 +66,27 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {
setupJobEventAction();
$("span.expand-application-timeline").click(function() {
+ var status = window.localStorage.getItem("expand-application-timeline") == "true";
+ status = !status;
+
$("#application-timeline").toggleClass('collapsed');
// Switch the class of the arrow from open to closed.
$(this).find('.expand-application-timeline-arrow').toggleClass('arrow-open');
$(this).find('.expand-application-timeline-arrow').toggleClass('arrow-closed');
+
+ window.localStorage.setItem("expand-application-timeline", "" + status);
});
}
+$(function (){
+ if (window.localStorage.getItem("expand-application-timeline") == "true") {
+ // Set it to false so that the click function can revert it
+ window.localStorage.setItem("expand-application-timeline", "false");
+ $("span.expand-application-timeline").trigger('click');
+ }
+});
+
function drawJobTimeline(groupArray, eventObjArray, startTime) {
var groups = new vis.DataSet(groupArray);
var items = new vis.DataSet(eventObjArray);
@@ -125,14 +138,27 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {
setupStageEventAction();
$("span.expand-job-timeline").click(function() {
+ var status = window.localStorage.getItem("expand-job-timeline") == "true";
+ status = !status;
+
$("#job-timeline").toggleClass('collapsed');
// Switch the class of the arrow from open to closed.
$(this).find('.expand-job-timeline-arrow').toggleClass('arrow-open');
$(this).find('.expand-job-timeline-arrow').toggleClass('arrow-closed');
+
+ window.localStorage.setItem("expand-job-timeline", "" + status);
});
}
+$(function (){
+ if (window.localStorage.getItem("expand-job-timeline") == "true") {
+ // Set it to false so that the click function can revert it
+ window.localStorage.setItem("expand-job-timeline", "false");
+ $("span.expand-job-timeline").trigger('click');
+ }
+});
+
function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, maxFinishTime) {
var groups = new vis.DataSet(groupArray);
var items = new vis.DataSet(eventObjArray);
@@ -176,14 +202,27 @@ function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, ma
setupZoomable("#task-assignment-timeline-zoom-lock", taskTimeline);
$("span.expand-task-assignment-timeline").click(function() {
+ var status = window.localStorage.getItem("expand-task-assignment-timeline") == "true";
+ status = !status;
+
$("#task-assignment-timeline").toggleClass("collapsed");
// Switch the class of the arrow from open to closed.
$(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-open");
$(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-closed");
+
+ window.localStorage.setItem("expand-task-assignment-timeline", "" + status);
});
}
+$(function (){
+ if (window.localStorage.getItem("expand-task-assignment-timeline") == "true") {
+ // Set it to false so that the click function can revert it
+ window.localStorage.setItem("expand-task-assignment-timeline", "false");
+ $("span.expand-task-assignment-timeline").trigger('click');
+ }
+});
+
function setupExecutorEventAction() {
$(".item.box.executor").each(function () {
$(this).hover(
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
new file mode 100644
index 0000000000..17d7b39c2d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import scala.xml.{Node, Unparsed}
+
+/**
+ * A data source that provides data for a page.
+ *
+ * @param pageSize the number of rows in a page
+ */
+private[ui] abstract class PagedDataSource[T](val pageSize: Int) {
+
+ if (pageSize <= 0) {
+ throw new IllegalArgumentException("Page size must be positive")
+ }
+
+ /**
+ * Return the size of all data.
+ */
+ protected def dataSize: Int
+
+ /**
+ * Slice a range of data.
+ */
+ protected def sliceData(from: Int, to: Int): Seq[T]
+
+ /**
+ * Slice the data for this page
+ */
+ def pageData(page: Int): PageData[T] = {
+ val totalPages = (dataSize + pageSize - 1) / pageSize
+ if (page <= 0 || page > totalPages) {
+ throw new IndexOutOfBoundsException(
+ s"Page $page is out of range. Please select a page number between 1 and $totalPages.")
+ }
+ val from = (page - 1) * pageSize
+ val to = dataSize.min(page * pageSize)
+ PageData(totalPages, sliceData(from, to))
+ }
+
+}
+
+/**
+ * The data returned by `PagedDataSource.pageData`, including the page number, the number of total
+ * pages and the data in this page.
+ */
+private[ui] case class PageData[T](totalPage: Int, data: Seq[T])
+
+/**
+ * A paged table that will generate a HTML table for a specified page and also the page navigation.
+ */
+private[ui] trait PagedTable[T] {
+
+ def tableId: String
+
+ def tableCssClass: String
+
+ def dataSource: PagedDataSource[T]
+
+ def headers: Seq[Node]
+
+ def row(t: T): Seq[Node]
+
+ def table(page: Int): Seq[Node] = {
+ val _dataSource = dataSource
+ try {
+ val PageData(totalPages, data) = _dataSource.pageData(page)
+ <div>
+ {pageNavigation(page, _dataSource.pageSize, totalPages)}
+ <table class={tableCssClass} id={tableId}>
+ {headers}
+ <tbody>
+ {data.map(row)}
+ </tbody>
+ </table>
+ </div>
+ } catch {
+ case e: IndexOutOfBoundsException =>
+ val PageData(totalPages, _) = _dataSource.pageData(1)
+ <div>
+ {pageNavigation(1, _dataSource.pageSize, totalPages)}
+ <div class="alert alert-error">{e.getMessage}</div>
+ </div>
+ }
+ }
+
+ /**
+ * Return a page navigation.
+ * <ul>
+ * <li>If the totalPages is 1, the page navigation will be empty</li>
+ * <li>
+ * If the totalPages is more than 1, it will create a page navigation including a group of
+ * page numbers and a form to submit the page number.
+ * </li>
+ * </ul>
+ *
+ * Here are some examples of the page navigation:
+ * {{{
+ * << < 11 12 13* 14 15 16 17 18 19 20 > >>
+ *
+ * This is the first group, so "<<" is hidden.
+ * < 1 2* 3 4 5 6 7 8 9 10 > >>
+ *
+ * This is the first group and the first page, so "<<" and "<" are hidden.
+ * 1* 2 3 4 5 6 7 8 9 10 > >>
+ *
+ * Assume totalPages is 19. This is the last group, so ">>" is hidden.
+ * << < 11 12 13* 14 15 16 17 18 19 >
+ *
+ * Assume totalPages is 19. This is the last group and the last page, so ">>" and ">" are hidden.
+ * << < 11 12 13 14 15 16 17 18 19*
+ *
+ * * means the current page number
+ * << means jumping to the first page of the previous group.
+ * < means jumping to the previous page.
+ * >> means jumping to the first page of the next group.
+ * > means jumping to the next page.
+ * }}}
+ */
+ private[ui] def pageNavigation(page: Int, pageSize: Int, totalPages: Int): Seq[Node] = {
+ if (totalPages == 1) {
+ Nil
+ } else {
+ // A group includes all page numbers will be shown in the page navigation.
+ // The size of group is 10 means there are 10 page numbers will be shown.
+ // The first group is 1 to 10, the second is 2 to 20, and so on
+ val groupSize = 10
+ val firstGroup = 0
+ val lastGroup = (totalPages - 1) / groupSize
+ val currentGroup = (page - 1) / groupSize
+ val startPage = currentGroup * groupSize + 1
+ val endPage = totalPages.min(startPage + groupSize - 1)
+ val pageTags = (startPage to endPage).map { p =>
+ if (p == page) {
+ // The current page should be disabled so that it cannot be clicked.
+ <li class="disabled"><a href="#">{p}</a></li>
+ } else {
+ <li><a href={pageLink(p)}>{p}</a></li>
+ }
+ }
+ val (goButtonJsFuncName, goButtonJsFunc) = goButtonJavascriptFunction
+ // When clicking the "Go" button, it will call this javascript method and then call
+ // "goButtonJsFuncName"
+ val formJs =
+ s"""$$(function(){
+ | $$( "#form-task-page" ).submit(function(event) {
+ | var page = $$("#form-task-page-no").val()
+ | var pageSize = $$("#form-task-page-size").val()
+ | pageSize = pageSize ? pageSize: 100;
+ | if (page != "") {
+ | ${goButtonJsFuncName}(page, pageSize);
+ | }
+ | event.preventDefault();
+ | });
+ |});
+ """.stripMargin
+
+ <div>
+ <div>
+ <form id="form-task-page" class="form-inline pull-right" style="margin-bottom: 0px;">
+ <label>{totalPages} Pages. Jump to</label>
+ <input type="text" id="form-task-page-no" value={page.toString} class="span1" />
+ <label>. Show </label>
+ <input type="text" id="form-task-page-size" value={pageSize.toString} class="span1" />
+ <label>tasks in a page.</label>
+ <button type="submit" class="btn">Go</button>
+ </form>
+ </div>
+ <div class="pagination" style="margin-bottom: 0px;">
+ <span style="float: left; padding-top: 4px; padding-right: 4px;">Page: </span>
+ <ul>
+ {if (currentGroup > firstGroup) {
+ <li>
+ <a href={pageLink(startPage - groupSize)} aria-label="Previous Group">
+ <span aria-hidden="true">
+ &lt;&lt;
+ </span>
+ </a>
+ </li>
+ }}
+ {if (page > 1) {
+ <li>
+ <a href={pageLink(page - 1)} aria-label="Previous">
+ <span aria-hidden="true">
+ &lt;
+ </span>
+ </a>
+ </li>
+ }}
+ {pageTags}
+ {if (page < totalPages) {
+ <li>
+ <a href={pageLink(page + 1)} aria-label="Next">
+ <span aria-hidden="true">&gt;</span>
+ </a>
+ </li>
+ }}
+ {if (currentGroup < lastGroup) {
+ <li>
+ <a href={pageLink(startPage + groupSize)} aria-label="Next Group">
+ <span aria-hidden="true">
+ &gt;&gt;
+ </span>
+ </a>
+ </li>
+ }}
+ </ul>
+ </div>
+ <script>
+ {Unparsed(goButtonJsFunc)}
+
+ {Unparsed(formJs)}
+ </script>
+ </div>
+ }
+ }
+
+ /**
+ * Return a link to jump to a page.
+ */
+ def pageLink(page: Int): String
+
+ /**
+ * Only the implementation knows how to create the url with a page number and the page size, so we
+ * leave this one to the implementation. The implementation should create a JavaScript method that
+ * accepts a page number along with the page size and jumps to the page. The return value is this
+ * method name and its JavaScript codes.
+ */
+ def goButtonJavascriptFunction: (String, String)
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 6e077bf3e7..cf04b5e592 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -17,6 +17,7 @@
package org.apache.spark.ui.jobs
+import java.net.URLEncoder
import java.util.Date
import javax.servlet.http.HttpServletRequest
@@ -27,13 +28,14 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
-import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
+import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData._
-import org.apache.spark.ui.scope.RDDOperationGraph
import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
+ import StagePage._
+
private val progressListener = parent.progressListener
private val operationGraphListener = parent.operationGraphListener
@@ -74,6 +76,16 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val parameterAttempt = request.getParameter("attempt")
require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")
+ val parameterTaskPage = request.getParameter("task.page")
+ val parameterTaskSortColumn = request.getParameter("task.sort")
+ val parameterTaskSortDesc = request.getParameter("task.desc")
+ val parameterTaskPageSize = request.getParameter("task.pageSize")
+
+ val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1)
+ val taskSortColumn = Option(parameterTaskSortColumn).getOrElse("Index")
+ val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false)
+ val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100)
+
// If this is set, expand the dag visualization by default
val expandDagVizParam = request.getParameter("expandDagViz")
val expandDagViz = expandDagVizParam != null && expandDagVizParam.toBoolean
@@ -231,52 +243,47 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
accumulableRow,
accumulables.values.toSeq)
- val taskHeadersAndCssClasses: Seq[(String, String)] =
- Seq(
- ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
- ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
- ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
- ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
- ("GC Time", ""),
- ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
- ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
- {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
- {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++
- {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
- {if (stageData.hasShuffleRead) {
- Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
- ("Shuffle Read Size / Records", ""),
- ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
- } else {
- Nil
- }} ++
- {if (stageData.hasShuffleWrite) {
- Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
- } else {
- Nil
- }} ++
- {if (stageData.hasBytesSpilled) {
- Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
- } else {
- Nil
- }} ++
- Seq(("Errors", ""))
-
- val unzipped = taskHeadersAndCssClasses.unzip
-
val currentTime = System.currentTimeMillis()
- val taskTable = UIUtils.listingTable(
- unzipped._1,
- taskRow(
+ val (taskTable, taskTableHTML) = try {
+ val _taskTable = new TaskPagedTable(
+ UIUtils.prependBaseUri(parent.basePath) +
+ s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}",
+ tasks,
hasAccumulators,
stageData.hasInput,
stageData.hasOutput,
stageData.hasShuffleRead,
stageData.hasShuffleWrite,
stageData.hasBytesSpilled,
- currentTime),
- tasks,
- headerClasses = unzipped._2)
+ currentTime,
+ pageSize = taskPageSize,
+ sortColumn = taskSortColumn,
+ desc = taskSortDesc
+ )
+ (_taskTable, _taskTable.table(taskPage))
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+ (null, <div class="alert alert-error">{e.getMessage}</div>)
+ }
+
+ val jsForScrollingDownToTaskTable =
+ <script>
+ {Unparsed {
+ """
+ |$(function() {
+ | if (/.*&task.sort=.*$/.test(location.search)) {
+ | var topOffset = $("#tasks-section").offset().top;
+ | $("html,body").animate({scrollTop: topOffset}, 200);
+ | }
+ |});
+ """.stripMargin
+ }
+ }
+ </script>
+
+ val taskIdsInPage = if (taskTable == null) Set.empty[Long]
+ else taskTable.dataSource.slicedTaskIds
+
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
@@ -499,12 +506,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
dagViz ++
maybeExpandDagViz ++
showAdditionalMetrics ++
- makeTimeline(stageData.taskData.values.toSeq, currentTime) ++
+ makeTimeline(
+ // Only show the tasks in the table
+ stageData.taskData.values.toSeq.filter(t => taskIdsInPage.contains(t.taskInfo.taskId)),
+ currentTime) ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
maybeAccumulableTable ++
- <h4>Tasks</h4> ++ taskTable
+ <h4 id="tasks-section">Tasks</h4> ++ taskTableHTML ++ jsForScrollingDownToTaskTable
UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
}
}
@@ -679,164 +689,619 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</script>
}
- def taskRow(
- hasAccumulators: Boolean,
- hasInput: Boolean,
- hasOutput: Boolean,
- hasShuffleRead: Boolean,
- hasShuffleWrite: Boolean,
- hasBytesSpilled: Boolean,
- currentTime: Long)(taskData: TaskUIData): Seq[Node] = {
- taskData match { case TaskUIData(info, metrics, errorMessage) =>
- val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
- else metrics.map(_.executorRunTime).getOrElse(1L)
- val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
- else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
- val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
- val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
- val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
- val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
- val gettingResultTime = getGettingResultTime(info, currentTime)
-
- val maybeAccumulators = info.accumulables
- val accumulatorsReadable = maybeAccumulators.map { acc =>
- StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
+}
+
+private[ui] object StagePage {
+ private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
+ if (info.gettingResult) {
+ if (info.finished) {
+ info.finishTime - info.gettingResultTime
+ } else {
+ // The task is still fetching the result.
+ currentTime - info.gettingResultTime
}
+ } else {
+ 0L
+ }
+ }
- val maybeInput = metrics.flatMap(_.inputMetrics)
- val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
- val inputReadable = maybeInput
- .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
- .getOrElse("")
- val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
-
- val maybeOutput = metrics.flatMap(_.outputMetrics)
- val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("")
- val outputReadable = maybeOutput
- .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
- .getOrElse("")
- val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
-
- val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
- val shuffleReadBlockedTimeSortable = maybeShuffleRead
- .map(_.fetchWaitTime.toString).getOrElse("")
- val shuffleReadBlockedTimeReadable =
- maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
-
- val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
- val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("")
- val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
- val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
-
- val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
- val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("")
- val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
-
- val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
- val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("")
- val shuffleWriteReadable = maybeShuffleWrite
- .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
- val shuffleWriteRecords = maybeShuffleWrite
- .map(_.shuffleRecordsWritten.toString).getOrElse("")
-
- val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
- val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
- if (ms == 0) "" else UIUtils.formatDuration(ms)
- }.getOrElse("")
-
- val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
- val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
- val memoryBytesSpilledReadable =
- maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
- val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
- val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
- val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
- <tr>
- <td>{info.index}</td>
- <td>{info.taskId}</td>
- <td sorttable_customkey={info.attempt.toString}>{
- if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
- }</td>
- <td>{info.status}</td>
- <td>{info.taskLocality}</td>
- <td>{info.executorId} / {info.host}</td>
- <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
- <td sorttable_customkey={duration.toString}>
- {formatDuration}
- </td>
- <td sorttable_customkey={schedulerDelay.toString}
- class={TaskDetailsClassNames.SCHEDULER_DELAY}>
- {UIUtils.formatDuration(schedulerDelay.toLong)}
- </td>
- <td sorttable_customkey={taskDeserializationTime.toString}
- class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
- {UIUtils.formatDuration(taskDeserializationTime.toLong)}
- </td>
- <td sorttable_customkey={gcTime.toString}>
- {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
- </td>
- <td sorttable_customkey={serializationTime.toString}
- class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
- {UIUtils.formatDuration(serializationTime)}
- </td>
- <td sorttable_customkey={gettingResultTime.toString}
- class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
- {UIUtils.formatDuration(gettingResultTime)}
- </td>
- {if (hasAccumulators) {
- <td>
- {Unparsed(accumulatorsReadable.mkString("<br/>"))}
- </td>
- }}
- {if (hasInput) {
- <td sorttable_customkey={inputSortable}>
- {s"$inputReadable / $inputRecords"}
- </td>
- }}
- {if (hasOutput) {
- <td sorttable_customkey={outputSortable}>
- {s"$outputReadable / $outputRecords"}
- </td>
- }}
+ private[ui] def getSchedulerDelay(
+ info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
+ if (info.finished) {
+ val totalExecutionTime = info.finishTime - info.launchTime
+ val executorOverhead = (metrics.executorDeserializeTime +
+ metrics.resultSerializationTime)
+ math.max(
+ 0,
+ totalExecutionTime - metrics.executorRunTime - executorOverhead -
+ getGettingResultTime(info, currentTime))
+ } else {
+ // The task is still running and the metrics like executorRunTime are not available.
+ 0L
+ }
+ }
+}
+
+private[ui] case class TaskTableRowInputData(inputSortable: Long, inputReadable: String)
+
+private[ui] case class TaskTableRowOutputData(outputSortable: Long, outputReadable: String)
+
+private[ui] case class TaskTableRowShuffleReadData(
+ shuffleReadBlockedTimeSortable: Long,
+ shuffleReadBlockedTimeReadable: String,
+ shuffleReadSortable: Long,
+ shuffleReadReadable: String,
+ shuffleReadRemoteSortable: Long,
+ shuffleReadRemoteReadable: String)
+
+private[ui] case class TaskTableRowShuffleWriteData(
+ writeTimeSortable: Long,
+ writeTimeReadable: String,
+ shuffleWriteSortable: Long,
+ shuffleWriteReadable: String)
+
+private[ui] case class TaskTableRowBytesSpilledData(
+ memoryBytesSpilledSortable: Long,
+ memoryBytesSpilledReadable: String,
+ diskBytesSpilledSortable: Long,
+ diskBytesSpilledReadable: String)
+
+/**
+ * Contains all data that needs for sorting and generating HTML. Using this one rather than
+ * TaskUIData to avoid creating duplicate contents during sorting the data.
+ */
+private[ui] case class TaskTableRowData(
+ index: Int,
+ taskId: Long,
+ attempt: Int,
+ speculative: Boolean,
+ status: String,
+ taskLocality: String,
+ executorIdAndHost: String,
+ launchTime: Long,
+ duration: Long,
+ formatDuration: String,
+ schedulerDelay: Long,
+ taskDeserializationTime: Long,
+ gcTime: Long,
+ serializationTime: Long,
+ gettingResultTime: Long,
+ accumulators: Option[String], // HTML
+ input: Option[TaskTableRowInputData],
+ output: Option[TaskTableRowOutputData],
+ shuffleRead: Option[TaskTableRowShuffleReadData],
+ shuffleWrite: Option[TaskTableRowShuffleWriteData],
+ bytesSpilled: Option[TaskTableRowBytesSpilledData],
+ error: String)
+
+private[ui] class TaskDataSource(
+ tasks: Seq[TaskUIData],
+ hasAccumulators: Boolean,
+ hasInput: Boolean,
+ hasOutput: Boolean,
+ hasShuffleRead: Boolean,
+ hasShuffleWrite: Boolean,
+ hasBytesSpilled: Boolean,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) {
+ import StagePage._
+
+ // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table
+ // so that we can avoid creating duplicate contents during sorting the data
+ private val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
+
+ private var _slicedTaskIds: Set[Long] = null
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[TaskTableRowData] = {
+ val r = data.slice(from, to)
+ _slicedTaskIds = r.map(_.taskId).toSet
+ r
+ }
+
+ def slicedTaskIds: Set[Long] = _slicedTaskIds
+
+ private def taskRow(taskData: TaskUIData): TaskTableRowData = {
+ val TaskUIData(info, metrics, errorMessage) = taskData
+ val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
+ else metrics.map(_.executorRunTime).getOrElse(1L)
+ val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+ else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
+ val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
+ val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
+ val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
+ val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
+ val gettingResultTime = getGettingResultTime(info, currentTime)
+
+ val maybeAccumulators = info.accumulables
+ val accumulatorsReadable = maybeAccumulators.map { acc =>
+ StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
+ }
+
+ val maybeInput = metrics.flatMap(_.inputMetrics)
+ val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
+ val inputReadable = maybeInput
+ .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+ .getOrElse("")
+ val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
+
+ val maybeOutput = metrics.flatMap(_.outputMetrics)
+ val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L)
+ val outputReadable = maybeOutput
+ .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
+ .getOrElse("")
+ val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
+
+ val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics)
+ val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L)
+ val shuffleReadBlockedTimeReadable =
+ maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
+
+ val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
+ val shuffleReadSortable = totalShuffleBytes.getOrElse(0L)
+ val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
+ val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
+
+ val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
+ val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L)
+ val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
+
+ val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
+ val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L)
+ val shuffleWriteReadable = maybeShuffleWrite
+ .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
+ val shuffleWriteRecords = maybeShuffleWrite
+ .map(_.shuffleRecordsWritten.toString).getOrElse("")
+
+ val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
+ val writeTimeSortable = maybeWriteTime.getOrElse(0L)
+ val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+ if (ms == 0) "" else UIUtils.formatDuration(ms)
+ }.getOrElse("")
+
+ val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
+ val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.getOrElse(0L)
+ val memoryBytesSpilledReadable =
+ maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
+
+ val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
+ val diskBytesSpilledSortable = maybeDiskBytesSpilled.getOrElse(0L)
+ val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
+
+ val input =
+ if (hasInput) {
+ Some(TaskTableRowInputData(inputSortable, s"$inputReadable / $inputRecords"))
+ } else {
+ None
+ }
+
+ val output =
+ if (hasOutput) {
+ Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / $outputRecords"))
+ } else {
+ None
+ }
+
+ val shuffleRead =
+ if (hasShuffleRead) {
+ Some(TaskTableRowShuffleReadData(
+ shuffleReadBlockedTimeSortable,
+ shuffleReadBlockedTimeReadable,
+ shuffleReadSortable,
+ s"$shuffleReadReadable / $shuffleReadRecords",
+ shuffleReadRemoteSortable,
+ shuffleReadRemoteReadable
+ ))
+ } else {
+ None
+ }
+
+ val shuffleWrite =
+ if (hasShuffleWrite) {
+ Some(TaskTableRowShuffleWriteData(
+ writeTimeSortable,
+ writeTimeReadable,
+ shuffleWriteSortable,
+ s"$shuffleWriteReadable / $shuffleWriteRecords"
+ ))
+ } else {
+ None
+ }
+
+ val bytesSpilled =
+ if (hasBytesSpilled) {
+ Some(TaskTableRowBytesSpilledData(
+ memoryBytesSpilledSortable,
+ memoryBytesSpilledReadable,
+ diskBytesSpilledSortable,
+ diskBytesSpilledReadable
+ ))
+ } else {
+ None
+ }
+
+ TaskTableRowData(
+ info.index,
+ info.taskId,
+ info.attempt,
+ info.speculative,
+ info.status,
+ info.taskLocality.toString,
+ s"${info.executorId} / ${info.host}",
+ info.launchTime,
+ duration,
+ formatDuration,
+ schedulerDelay,
+ taskDeserializationTime,
+ gcTime,
+ serializationTime,
+ gettingResultTime,
+ if (hasAccumulators) Some(accumulatorsReadable.mkString("<br/>")) else None,
+ input,
+ output,
+ shuffleRead,
+ shuffleWrite,
+ bytesSpilled,
+ errorMessage.getOrElse("")
+ )
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = {
+ val ordering = sortColumn match {
+ case "Index" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Int.compare(x.index, y.index)
+ }
+ case "ID" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.taskId, y.taskId)
+ }
+ case "Attempt" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Int.compare(x.attempt, y.attempt)
+ }
+ case "Status" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.String.compare(x.status, y.status)
+ }
+ case "Locality Level" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.String.compare(x.taskLocality, y.taskLocality)
+ }
+ case "Executor ID / Host" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost)
+ }
+ case "Launch Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.launchTime, y.launchTime)
+ }
+ case "Duration" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Scheduler Delay" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay)
+ }
+ case "Task Deserialization Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.taskDeserializationTime, y.taskDeserializationTime)
+ }
+ case "GC Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.gcTime, y.gcTime)
+ }
+ case "Result Serialization Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.serializationTime, y.serializationTime)
+ }
+ case "Getting Result Time" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime)
+ }
+ case "Accumulators" =>
+ if (hasAccumulators) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.String.compare(x.accumulators.get, y.accumulators.get)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Accumulators because of no accumulators")
+ }
+ case "Input Size / Records" =>
+ if (hasInput) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.input.get.inputSortable, y.input.get.inputSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Input Size / Records because of no inputs")
+ }
+ case "Output Size / Records" =>
+ if (hasOutput) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.output.get.outputSortable, y.output.get.outputSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Output Size / Records because of no outputs")
+ }
+ // ShuffleRead
+ case "Shuffle Read Blocked Time" =>
+ if (hasShuffleRead) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.shuffleRead.get.shuffleReadBlockedTimeSortable,
+ y.shuffleRead.get.shuffleReadBlockedTimeSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads")
+ }
+ case "Shuffle Read Size / Records" =>
+ if (hasShuffleRead) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.shuffleRead.get.shuffleReadSortable,
+ y.shuffleRead.get.shuffleReadSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Read Size / Records because of no shuffle reads")
+ }
+ case "Shuffle Remote Reads" =>
+ if (hasShuffleRead) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.shuffleRead.get.shuffleReadRemoteSortable,
+ y.shuffleRead.get.shuffleReadRemoteSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Remote Reads because of no shuffle reads")
+ }
+ // ShuffleWrite
+ case "Write Time" =>
+ if (hasShuffleWrite) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.shuffleWrite.get.writeTimeSortable,
+ y.shuffleWrite.get.writeTimeSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Write Time because of no shuffle writes")
+ }
+ case "Shuffle Write Size / Records" =>
+ if (hasShuffleWrite) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.shuffleWrite.get.shuffleWriteSortable,
+ y.shuffleWrite.get.shuffleWriteSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Write Size / Records because of no shuffle writes")
+ }
+ // BytesSpilled
+ case "Shuffle Spill (Memory)" =>
+ if (hasBytesSpilled) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.bytesSpilled.get.memoryBytesSpilledSortable,
+ y.bytesSpilled.get.memoryBytesSpilledSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Spill (Memory) because of no spills")
+ }
+ case "Shuffle Spill (Disk)" =>
+ if (hasBytesSpilled) {
+ new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.bytesSpilled.get.diskBytesSpilledSortable,
+ y.bytesSpilled.get.diskBytesSpilledSortable)
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Cannot sort by Shuffle Spill (Disk) because of no spills")
+ }
+ case "Errors" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.String.compare(x.error, y.error)
+ }
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+
+}
+
+private[ui] class TaskPagedTable(
+ basePath: String,
+ data: Seq[TaskUIData],
+ hasAccumulators: Boolean,
+ hasInput: Boolean,
+ hasOutput: Boolean,
+ hasShuffleRead: Boolean,
+ hasShuffleWrite: Boolean,
+ hasBytesSpilled: Boolean,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedTable[TaskTableRowData]{
+
+ override def tableId: String = ""
+
+ override def tableCssClass: String = "table table-bordered table-condensed table-striped"
+
+ override val dataSource: TaskDataSource = new TaskDataSource(
+ data,
+ hasAccumulators,
+ hasInput,
+ hasOutput,
+ hasShuffleRead,
+ hasShuffleWrite,
+ hasBytesSpilled,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc
+ )
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"${basePath}&task.page=$page&task.sort=${encodedSortColumn}&task.desc=${desc}" +
+ s"&task.pageSize=${pageSize}"
+ }
+
+ override def goButtonJavascriptFunction: (String, String) = {
+ val jsFuncName = "goToTaskPage"
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ val jsFunc = s"""
+ |currentTaskPageSize = ${pageSize}
+ |function goToTaskPage(page, pageSize) {
+ | // Set page to 1 if the page size changes
+ | page = pageSize == currentTaskPageSize ? page : 1;
+ | var url = "${basePath}&task.sort=${encodedSortColumn}&task.desc=${desc}" +
+ | "&task.page=" + page + "&task.pageSize=" + pageSize;
+ | window.location.href = url;
+ |}
+ """.stripMargin
+ (jsFuncName, jsFunc)
+ }
+
+ def headers: Seq[Node] = {
+ val taskHeadersAndCssClasses: Seq[(String, String)] =
+ Seq(
+ ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
+ ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
+ ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
+ ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
+ ("GC Time", ""),
+ ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
+ ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
+ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
+ {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
+ {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
{if (hasShuffleRead) {
- <td sorttable_customkey={shuffleReadBlockedTimeSortable}
- class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
- {shuffleReadBlockedTimeReadable}
- </td>
- <td sorttable_customkey={shuffleReadSortable}>
- {s"$shuffleReadReadable / $shuffleReadRecords"}
- </td>
- <td sorttable_customkey={shuffleReadRemoteSortable}
- class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
- {shuffleReadRemoteReadable}
- </td>
- }}
+ Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
+ ("Shuffle Read Size / Records", ""),
+ ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
+ } else {
+ Nil
+ }} ++
{if (hasShuffleWrite) {
- <td sorttable_customkey={writeTimeSortable}>
- {writeTimeReadable}
- </td>
- <td sorttable_customkey={shuffleWriteSortable}>
- {s"$shuffleWriteReadable / $shuffleWriteRecords"}
- </td>
- }}
+ Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
+ } else {
+ Nil
+ }} ++
{if (hasBytesSpilled) {
- <td sorttable_customkey={memoryBytesSpilledSortable}>
- {memoryBytesSpilledReadable}
- </td>
- <td sorttable_customkey={diskBytesSpilledSortable}>
- {diskBytesSpilledReadable}
- </td>
- }}
- {errorMessageCell(errorMessage)}
- </tr>
+ Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
+ } else {
+ Nil
+ }} ++
+ Seq(("Errors", ""))
+
+ if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
+ new IllegalArgumentException(s"Unknown column: $sortColumn")
}
+
+ val headerRow: Seq[Node] = {
+ taskHeadersAndCssClasses.map { case (header, cssClass) =>
+ if (header == sortColumn) {
+ val headerLink =
+ s"$basePath&task.sort=${URLEncoder.encode(header, "UTF-8")}&task.desc=${!desc}" +
+ s"&task.pageSize=${pageSize}"
+ val js = Unparsed(s"window.location.href='${headerLink}'")
+ val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+ <th class={cssClass} onclick={js} style="cursor: pointer;">
+ {header}
+ <span>&nbsp;{Unparsed(arrow)}</span>
+ </th>
+ } else {
+ val headerLink =
+ s"$basePath&task.sort=${URLEncoder.encode(header, "UTF-8")}&task.pageSize=${pageSize}"
+ val js = Unparsed(s"window.location.href='${headerLink}'")
+ <th class={cssClass} onclick={js} style="cursor: pointer;">
+ {header}
+ </th>
+ }
+ }
+ }
+ <thead>{headerRow}</thead>
+ }
+
+ def row(task: TaskTableRowData): Seq[Node] = {
+ <tr>
+ <td>{task.index}</td>
+ <td>{task.taskId}</td>
+ <td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td>
+ <td>{task.status}</td>
+ <td>{task.taskLocality}</td>
+ <td>{task.executorIdAndHost}</td>
+ <td>{UIUtils.formatDate(new Date(task.launchTime))}</td>
+ <td>{task.formatDuration}</td>
+ <td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
+ {UIUtils.formatDuration(task.schedulerDelay)}
+ </td>
+ <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
+ {UIUtils.formatDuration(task.taskDeserializationTime)}
+ </td>
+ <td>
+ {if (task.gcTime > 0) UIUtils.formatDuration(task.gcTime) else ""}
+ </td>
+ <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
+ {UIUtils.formatDuration(task.serializationTime)}
+ </td>
+ <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
+ {UIUtils.formatDuration(task.gettingResultTime)}
+ </td>
+ {if (task.accumulators.nonEmpty) {
+ <td>{Unparsed(task.accumulators.get)}</td>
+ }}
+ {if (task.input.nonEmpty) {
+ <td>{task.input.get.inputReadable}</td>
+ }}
+ {if (task.output.nonEmpty) {
+ <td>{task.output.get.outputReadable}</td>
+ }}
+ {if (task.shuffleRead.nonEmpty) {
+ <td class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
+ {task.shuffleRead.get.shuffleReadBlockedTimeReadable}
+ </td>
+ <td>{task.shuffleRead.get.shuffleReadReadable}</td>
+ <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
+ {task.shuffleRead.get.shuffleReadRemoteReadable}
+ </td>
+ }}
+ {if (task.shuffleWrite.nonEmpty) {
+ <td>{task.shuffleWrite.get.writeTimeReadable}</td>
+ <td>{task.shuffleWrite.get.shuffleWriteReadable}</td>
+ }}
+ {if (task.bytesSpilled.nonEmpty) {
+ <td>{task.bytesSpilled.get.memoryBytesSpilledReadable}</td>
+ <td>{task.bytesSpilled.get.diskBytesSpilledReadable}</td>
+ }}
+ {errorMessageCell(task.error)}
+ </tr>
}
- private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = {
- val error = errorMessage.getOrElse("")
+ private def errorMessageCell(error: String): Seq[Node] = {
val isMultiline = error.indexOf('\n') >= 0
// Display the first line by default
val errorSummary = StringEscapeUtils.escapeHtml4(
@@ -860,32 +1325,4 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}
<td>{errorSummary}{details}</td>
}
-
- private def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
- if (info.gettingResult) {
- if (info.finished) {
- info.finishTime - info.gettingResultTime
- } else {
- // The task is still fetching the result.
- currentTime - info.gettingResultTime
- }
- } else {
- 0L
- }
- }
-
- private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
- if (info.finished) {
- val totalExecutionTime = info.finishTime - info.launchTime
- val executorOverhead = (metrics.executorDeserializeTime +
- metrics.resultSerializationTime)
- math.max(
- 0,
- totalExecutionTime - metrics.executorRunTime - executorOverhead -
- getGettingResultTime(info, currentTime))
- } else {
- // The task is still running and the metrics like executorRunTime are not available.
- 0L
- }
- }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala b/core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala
new file mode 100644
index 0000000000..cc76c141c5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/PagedTableSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import scala.xml.Node
+
+import org.apache.spark.SparkFunSuite
+
+class PagedDataSourceSuite extends SparkFunSuite {
+
+ test("basic") {
+ val dataSource1 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+ assert(dataSource1.pageData(1) === PageData(3, (1 to 2)))
+
+ val dataSource2 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+ assert(dataSource2.pageData(2) === PageData(3, (3 to 4)))
+
+ val dataSource3 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+ assert(dataSource3.pageData(3) === PageData(3, Seq(5)))
+
+ val dataSource4 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+ val e1 = intercept[IndexOutOfBoundsException] {
+ dataSource4.pageData(4)
+ }
+ assert(e1.getMessage === "Page 4 is out of range. Please select a page number between 1 and 3.")
+
+ val dataSource5 = new SeqPagedDataSource[Int](1 to 5, pageSize = 2)
+ val e2 = intercept[IndexOutOfBoundsException] {
+ dataSource5.pageData(0)
+ }
+ assert(e2.getMessage === "Page 0 is out of range. Please select a page number between 1 and 3.")
+
+ }
+}
+
+class PagedTableSuite extends SparkFunSuite {
+ test("pageNavigation") {
+ // Create a fake PagedTable to test pageNavigation
+ val pagedTable = new PagedTable[Int] {
+ override def tableId: String = ""
+
+ override def tableCssClass: String = ""
+
+ override def dataSource: PagedDataSource[Int] = null
+
+ override def pageLink(page: Int): String = page.toString
+
+ override def headers: Seq[Node] = Nil
+
+ override def row(t: Int): Seq[Node] = Nil
+
+ override def goButtonJavascriptFunction: (String, String) = ("", "")
+ }
+
+ assert(pagedTable.pageNavigation(1, 10, 1) === Nil)
+ assert(
+ (pagedTable.pageNavigation(1, 10, 2).head \\ "li").map(_.text.trim) === Seq("1", "2", ">"))
+ assert(
+ (pagedTable.pageNavigation(2, 10, 2).head \\ "li").map(_.text.trim) === Seq("<", "1", "2"))
+
+ assert((pagedTable.pageNavigation(1, 10, 100).head \\ "li").map(_.text.trim) ===
+ (1 to 10).map(_.toString) ++ Seq(">", ">>"))
+ assert((pagedTable.pageNavigation(2, 10, 100).head \\ "li").map(_.text.trim) ===
+ Seq("<") ++ (1 to 10).map(_.toString) ++ Seq(">", ">>"))
+
+ assert((pagedTable.pageNavigation(100, 10, 100).head \\ "li").map(_.text.trim) ===
+ Seq("<<", "<") ++ (91 to 100).map(_.toString))
+ assert((pagedTable.pageNavigation(99, 10, 100).head \\ "li").map(_.text.trim) ===
+ Seq("<<", "<") ++ (91 to 100).map(_.toString) ++ Seq(">"))
+
+ assert((pagedTable.pageNavigation(11, 10, 100).head \\ "li").map(_.text.trim) ===
+ Seq("<<", "<") ++ (11 to 20).map(_.toString) ++ Seq(">", ">>"))
+ assert((pagedTable.pageNavigation(93, 10, 97).head \\ "li").map(_.text.trim) ===
+ Seq("<<", "<") ++ (91 to 97).map(_.toString) ++ Seq(">"))
+ }
+}
+
+private[spark] class SeqPagedDataSource[T](seq: Seq[T], pageSize: Int)
+ extends PagedDataSource[T](pageSize) {
+
+ override protected def dataSize: Int = seq.size
+
+ override protected def sliceData(from: Int, to: Int): Seq[T] = seq.slice(from, to)
+}