diff options
Diffstat (limited to 'core/src/main')
5 files changed, 1003 insertions, 222 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js index 0b450dc76b..3c8ddddf07 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js +++ b/core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js @@ -19,6 +19,9 @@ * to be registered after the page loads. */ $(function() { $("span.expand-additional-metrics").click(function(){ + var status = window.localStorage.getItem("expand-additional-metrics") == "true"; + status = !status; + // Expand the list of additional metrics. var additionalMetricsDiv = $(this).parent().find('.additional-metrics'); $(additionalMetricsDiv).toggleClass('collapsed'); @@ -26,17 +29,31 @@ $(function() { // Switch the class of the arrow from open to closed. $(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open'); $(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed'); + + window.localStorage.setItem("expand-additional-metrics", "" + status); }); + if (window.localStorage.getItem("expand-additional-metrics") == "true") { + // Set it to false so that the click function can revert it + window.localStorage.setItem("expand-additional-metrics", "false"); + $("span.expand-additional-metrics").trigger("click"); + } + stripeSummaryTable(); $('input[type="checkbox"]').click(function() { - var column = "table ." + $(this).attr("name"); + var name = $(this).attr("name") + var column = "table ." + name; + var status = window.localStorage.getItem(name) == "true"; + status = !status; $(column).toggle(); stripeSummaryTable(); + window.localStorage.setItem(name, "" + status); }); $("#select-all-metrics").click(function() { + var status = window.localStorage.getItem("select-all-metrics") == "true"; + status = !status; if (this.checked) { // Toggle all un-checked options. $('input[type="checkbox"]:not(:checked)').trigger('click'); @@ -44,6 +61,21 @@ $(function() { // Toggle all checked options. $('input[type="checkbox"]:checked').trigger('click'); } + window.localStorage.setItem("select-all-metrics", "" + status); + }); + + if (window.localStorage.getItem("select-all-metrics") == "true") { + $("#select-all-metrics").attr('checked', status); + } + + $("span.additional-metric-title").parent().find('input[type="checkbox"]').each(function() { + var name = $(this).attr("name") + // If name is undefined, then skip it because it's the "select-all-metrics" checkbox + if (name && window.localStorage.getItem(name) == "true") { + // Set it to false so that the click function can revert it + window.localStorage.setItem(name, "false"); + $(this).trigger("click") + } }); // Trigger a click on the checkbox if a user clicks the label next to it. diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 9fa53baaf4..4a893bc018 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -73,12 +73,23 @@ var StagePageVizConstants = { }; /* + * Return "expand-dag-viz-arrow-job" if forJob is true. + * Otherwise, return "expand-dag-viz-arrow-stage". + */ +function expandDagVizArrowKey(forJob) { + return forJob ? "expand-dag-viz-arrow-job" : "expand-dag-viz-arrow-stage"; +} + +/* * Show or hide the RDD DAG visualization. * * The graph is only rendered the first time this is called. * This is the narrow interface called from the Scala UI code. */ function toggleDagViz(forJob) { + var status = window.localStorage.getItem(expandDagVizArrowKey(forJob)) == "true"; + status = !status; + var arrowSelector = ".expand-dag-viz-arrow"; $(arrowSelector).toggleClass('arrow-closed'); $(arrowSelector).toggleClass('arrow-open'); @@ -93,8 +104,24 @@ function toggleDagViz(forJob) { // Save the graph for later so we don't have to render it again graphContainer().style("display", "none"); } + + window.localStorage.setItem(expandDagVizArrowKey(forJob), "" + status); } +$(function (){ + if (window.localStorage.getItem(expandDagVizArrowKey(false)) == "true") { + // Set it to false so that the click function can revert it + window.localStorage.setItem(expandDagVizArrowKey(false), "false"); + toggleDagViz(false); + } + + if (window.localStorage.getItem(expandDagVizArrowKey(true)) == "true") { + // Set it to false so that the click function can revert it + window.localStorage.setItem(expandDagVizArrowKey(true), "false"); + toggleDagViz(true); + } +}); + /* * Render the RDD DAG visualization. * diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js index ca74ef9d7e..f4453c71df 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js @@ -66,14 +66,27 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) { setupJobEventAction(); $("span.expand-application-timeline").click(function() { + var status = window.localStorage.getItem("expand-application-timeline") == "true"; + status = !status; + $("#application-timeline").toggleClass('collapsed'); // Switch the class of the arrow from open to closed. $(this).find('.expand-application-timeline-arrow').toggleClass('arrow-open'); $(this).find('.expand-application-timeline-arrow').toggleClass('arrow-closed'); + + window.localStorage.setItem("expand-application-timeline", "" + status); }); } +$(function (){ + if (window.localStorage.getItem("expand-application-timeline") == "true") { + // Set it to false so that the click function can revert it + window.localStorage.setItem("expand-application-timeline", "false"); + $("span.expand-application-timeline").trigger('click'); + } +}); + function drawJobTimeline(groupArray, eventObjArray, startTime) { var groups = new vis.DataSet(groupArray); var items = new vis.DataSet(eventObjArray); @@ -125,14 +138,27 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) { setupStageEventAction(); $("span.expand-job-timeline").click(function() { + var status = window.localStorage.getItem("expand-job-timeline") == "true"; + status = !status; + $("#job-timeline").toggleClass('collapsed'); // Switch the class of the arrow from open to closed. $(this).find('.expand-job-timeline-arrow').toggleClass('arrow-open'); $(this).find('.expand-job-timeline-arrow').toggleClass('arrow-closed'); + + window.localStorage.setItem("expand-job-timeline", "" + status); }); } +$(function (){ + if (window.localStorage.getItem("expand-job-timeline") == "true") { + // Set it to false so that the click function can revert it + window.localStorage.setItem("expand-job-timeline", "false"); + $("span.expand-job-timeline").trigger('click'); + } +}); + function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, maxFinishTime) { var groups = new vis.DataSet(groupArray); var items = new vis.DataSet(eventObjArray); @@ -176,14 +202,27 @@ function drawTaskAssignmentTimeline(groupArray, eventObjArray, minLaunchTime, ma setupZoomable("#task-assignment-timeline-zoom-lock", taskTimeline); $("span.expand-task-assignment-timeline").click(function() { + var status = window.localStorage.getItem("expand-task-assignment-timeline") == "true"; + status = !status; + $("#task-assignment-timeline").toggleClass("collapsed"); // Switch the class of the arrow from open to closed. $(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-open"); $(this).find(".expand-task-assignment-timeline-arrow").toggleClass("arrow-closed"); + + window.localStorage.setItem("expand-task-assignment-timeline", "" + status); }); } +$(function (){ + if (window.localStorage.getItem("expand-task-assignment-timeline") == "true") { + // Set it to false so that the click function can revert it + window.localStorage.setItem("expand-task-assignment-timeline", "false"); + $("span.expand-task-assignment-timeline").trigger('click'); + } +}); + function setupExecutorEventAction() { $(".item.box.executor").each(function () { $(this).hover( diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala new file mode 100644 index 0000000000..17d7b39c2d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import scala.xml.{Node, Unparsed} + +/** + * A data source that provides data for a page. + * + * @param pageSize the number of rows in a page + */ +private[ui] abstract class PagedDataSource[T](val pageSize: Int) { + + if (pageSize <= 0) { + throw new IllegalArgumentException("Page size must be positive") + } + + /** + * Return the size of all data. + */ + protected def dataSize: Int + + /** + * Slice a range of data. + */ + protected def sliceData(from: Int, to: Int): Seq[T] + + /** + * Slice the data for this page + */ + def pageData(page: Int): PageData[T] = { + val totalPages = (dataSize + pageSize - 1) / pageSize + if (page <= 0 || page > totalPages) { + throw new IndexOutOfBoundsException( + s"Page $page is out of range. Please select a page number between 1 and $totalPages.") + } + val from = (page - 1) * pageSize + val to = dataSize.min(page * pageSize) + PageData(totalPages, sliceData(from, to)) + } + +} + +/** + * The data returned by `PagedDataSource.pageData`, including the page number, the number of total + * pages and the data in this page. + */ +private[ui] case class PageData[T](totalPage: Int, data: Seq[T]) + +/** + * A paged table that will generate a HTML table for a specified page and also the page navigation. + */ +private[ui] trait PagedTable[T] { + + def tableId: String + + def tableCssClass: String + + def dataSource: PagedDataSource[T] + + def headers: Seq[Node] + + def row(t: T): Seq[Node] + + def table(page: Int): Seq[Node] = { + val _dataSource = dataSource + try { + val PageData(totalPages, data) = _dataSource.pageData(page) + <div> + {pageNavigation(page, _dataSource.pageSize, totalPages)} + <table class={tableCssClass} id={tableId}> + {headers} + <tbody> + {data.map(row)} + </tbody> + </table> + </div> + } catch { + case e: IndexOutOfBoundsException => + val PageData(totalPages, _) = _dataSource.pageData(1) + <div> + {pageNavigation(1, _dataSource.pageSize, totalPages)} + <div class="alert alert-error">{e.getMessage}</div> + </div> + } + } + + /** + * Return a page navigation. + * <ul> + * <li>If the totalPages is 1, the page navigation will be empty</li> + * <li> + * If the totalPages is more than 1, it will create a page navigation including a group of + * page numbers and a form to submit the page number. + * </li> + * </ul> + * + * Here are some examples of the page navigation: + * {{{ + * << < 11 12 13* 14 15 16 17 18 19 20 > >> + * + * This is the first group, so "<<" is hidden. + * < 1 2* 3 4 5 6 7 8 9 10 > >> + * + * This is the first group and the first page, so "<<" and "<" are hidden. + * 1* 2 3 4 5 6 7 8 9 10 > >> + * + * Assume totalPages is 19. This is the last group, so ">>" is hidden. + * << < 11 12 13* 14 15 16 17 18 19 > + * + * Assume totalPages is 19. This is the last group and the last page, so ">>" and ">" are hidden. + * << < 11 12 13 14 15 16 17 18 19* + * + * * means the current page number + * << means jumping to the first page of the previous group. + * < means jumping to the previous page. + * >> means jumping to the first page of the next group. + * > means jumping to the next page. + * }}} + */ + private[ui] def pageNavigation(page: Int, pageSize: Int, totalPages: Int): Seq[Node] = { + if (totalPages == 1) { + Nil + } else { + // A group includes all page numbers will be shown in the page navigation. + // The size of group is 10 means there are 10 page numbers will be shown. + // The first group is 1 to 10, the second is 2 to 20, and so on + val groupSize = 10 + val firstGroup = 0 + val lastGroup = (totalPages - 1) / groupSize + val currentGroup = (page - 1) / groupSize + val startPage = currentGroup * groupSize + 1 + val endPage = totalPages.min(startPage + groupSize - 1) + val pageTags = (startPage to endPage).map { p => + if (p == page) { + // The current page should be disabled so that it cannot be clicked. + <li class="disabled"><a href="#">{p}</a></li> + } else { + <li><a href={pageLink(p)}>{p}</a></li> + } + } + val (goButtonJsFuncName, goButtonJsFunc) = goButtonJavascriptFunction + // When clicking the "Go" button, it will call this javascript method and then call + // "goButtonJsFuncName" + val formJs = + s"""$$(function(){ + | $$( "#form-task-page" ).submit(function(event) { + | var page = $$("#form-task-page-no").val() + | var pageSize = $$("#form-task-page-size").val() + | pageSize = pageSize ? pageSize: 100; + | if (page != "") { + | ${goButtonJsFuncName}(page, pageSize); + | } + | event.preventDefault(); + | }); + |}); + """.stripMargin + + <div> + <div> + <form id="form-task-page" class="form-inline pull-right" style="margin-bottom: 0px;"> + <label>{totalPages} Pages. Jump to</label> + <input type="text" id="form-task-page-no" value={page.toString} class="span1" /> + <label>. Show </label> + <input type="text" id="form-task-page-size" value={pageSize.toString} class="span1" /> + <label>tasks in a page.</label> + <button type="submit" class="btn">Go</button> + </form> + </div> + <div class="pagination" style="margin-bottom: 0px;"> + <span style="float: left; padding-top: 4px; padding-right: 4px;">Page: </span> + <ul> + {if (currentGroup > firstGroup) { + <li> + <a href={pageLink(startPage - groupSize)} aria-label="Previous Group"> + <span aria-hidden="true"> + << + </span> + </a> + </li> + }} + {if (page > 1) { + <li> + <a href={pageLink(page - 1)} aria-label="Previous"> + <span aria-hidden="true"> + < + </span> + </a> + </li> + }} + {pageTags} + {if (page < totalPages) { + <li> + <a href={pageLink(page + 1)} aria-label="Next"> + <span aria-hidden="true">></span> + </a> + </li> + }} + {if (currentGroup < lastGroup) { + <li> + <a href={pageLink(startPage + groupSize)} aria-label="Next Group"> + <span aria-hidden="true"> + >> + </span> + </a> + </li> + }} + </ul> + </div> + <script> + {Unparsed(goButtonJsFunc)} + + {Unparsed(formJs)} + </script> + </div> + } + } + + /** + * Return a link to jump to a page. + */ + def pageLink(page: Int): String + + /** + * Only the implementation knows how to create the url with a page number and the page size, so we + * leave this one to the implementation. The implementation should create a JavaScript method that + * accepts a page number along with the page size and jumps to the page. The return value is this + * method name and its JavaScript codes. + */ + def goButtonJavascriptFunction: (String, String) +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 6e077bf3e7..cf04b5e592 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui.jobs +import java.net.URLEncoder import java.util.Date import javax.servlet.http.HttpServletRequest @@ -27,13 +28,14 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} -import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} +import org.apache.spark.ui._ import org.apache.spark.ui.jobs.UIData._ -import org.apache.spark.ui.scope.RDDOperationGraph import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { + import StagePage._ + private val progressListener = parent.progressListener private val operationGraphListener = parent.operationGraphListener @@ -74,6 +76,16 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val parameterAttempt = request.getParameter("attempt") require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter") + val parameterTaskPage = request.getParameter("task.page") + val parameterTaskSortColumn = request.getParameter("task.sort") + val parameterTaskSortDesc = request.getParameter("task.desc") + val parameterTaskPageSize = request.getParameter("task.pageSize") + + val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1) + val taskSortColumn = Option(parameterTaskSortColumn).getOrElse("Index") + val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false) + val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100) + // If this is set, expand the dag visualization by default val expandDagVizParam = request.getParameter("expandDagViz") val expandDagViz = expandDagVizParam != null && expandDagVizParam.toBoolean @@ -231,52 +243,47 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { accumulableRow, accumulables.values.toSeq) - val taskHeadersAndCssClasses: Seq[(String, String)] = - Seq( - ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""), - ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""), - ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY), - ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), - ("GC Time", ""), - ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), - ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ - {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ - {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++ - {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ - {if (stageData.hasShuffleRead) { - Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), - ("Shuffle Read Size / Records", ""), - ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE)) - } else { - Nil - }} ++ - {if (stageData.hasShuffleWrite) { - Seq(("Write Time", ""), ("Shuffle Write Size / Records", "")) - } else { - Nil - }} ++ - {if (stageData.hasBytesSpilled) { - Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) - } else { - Nil - }} ++ - Seq(("Errors", "")) - - val unzipped = taskHeadersAndCssClasses.unzip - val currentTime = System.currentTimeMillis() - val taskTable = UIUtils.listingTable( - unzipped._1, - taskRow( + val (taskTable, taskTableHTML) = try { + val _taskTable = new TaskPagedTable( + UIUtils.prependBaseUri(parent.basePath) + + s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}", + tasks, hasAccumulators, stageData.hasInput, stageData.hasOutput, stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled, - currentTime), - tasks, - headerClasses = unzipped._2) + currentTime, + pageSize = taskPageSize, + sortColumn = taskSortColumn, + desc = taskSortDesc + ) + (_taskTable, _taskTable.table(taskPage)) + } catch { + case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => + (null, <div class="alert alert-error">{e.getMessage}</div>) + } + + val jsForScrollingDownToTaskTable = + <script> + {Unparsed { + """ + |$(function() { + | if (/.*&task.sort=.*$/.test(location.search)) { + | var topOffset = $("#tasks-section").offset().top; + | $("html,body").animate({scrollTop: topOffset}, 200); + | } + |}); + """.stripMargin + } + } + </script> + + val taskIdsInPage = if (taskTable == null) Set.empty[Long] + else taskTable.dataSource.slicedTaskIds + // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) @@ -499,12 +506,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { dagViz ++ maybeExpandDagViz ++ showAdditionalMetrics ++ - makeTimeline(stageData.taskData.values.toSeq, currentTime) ++ + makeTimeline( + // Only show the tasks in the table + stageData.taskData.values.toSeq.filter(t => taskIdsInPage.contains(t.taskInfo.taskId)), + currentTime) ++ <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++ <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ <h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++ maybeAccumulableTable ++ - <h4>Tasks</h4> ++ taskTable + <h4 id="tasks-section">Tasks</h4> ++ taskTableHTML ++ jsForScrollingDownToTaskTable UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true) } } @@ -679,164 +689,619 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { </script> } - def taskRow( - hasAccumulators: Boolean, - hasInput: Boolean, - hasOutput: Boolean, - hasShuffleRead: Boolean, - hasShuffleWrite: Boolean, - hasBytesSpilled: Boolean, - currentTime: Long)(taskData: TaskUIData): Seq[Node] = { - taskData match { case TaskUIData(info, metrics, errorMessage) => - val duration = if (info.status == "RUNNING") info.timeRunning(currentTime) - else metrics.map(_.executorRunTime).getOrElse(1L) - val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) - else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") - val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) - val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) - val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) - val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - val gettingResultTime = getGettingResultTime(info, currentTime) - - val maybeAccumulators = info.accumulables - val accumulatorsReadable = maybeAccumulators.map { acc => - StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}") +} + +private[ui] object StagePage { + private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = { + if (info.gettingResult) { + if (info.finished) { + info.finishTime - info.gettingResultTime + } else { + // The task is still fetching the result. + currentTime - info.gettingResultTime } + } else { + 0L + } + } - val maybeInput = metrics.flatMap(_.inputMetrics) - val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") - val inputReadable = maybeInput - .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") - .getOrElse("") - val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("") - - val maybeOutput = metrics.flatMap(_.outputMetrics) - val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("") - val outputReadable = maybeOutput - .map(m => s"${Utils.bytesToString(m.bytesWritten)}") - .getOrElse("") - val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("") - - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics) - val shuffleReadBlockedTimeSortable = maybeShuffleRead - .map(_.fetchWaitTime.toString).getOrElse("") - val shuffleReadBlockedTimeReadable = - maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("") - - val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead) - val shuffleReadSortable = totalShuffleBytes.map(_.toString).getOrElse("") - val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("") - val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("") - - val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead) - val shuffleReadRemoteSortable = remoteShuffleBytes.map(_.toString).getOrElse("") - val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("") - - val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics) - val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite - .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("") - val shuffleWriteRecords = maybeShuffleWrite - .map(_.shuffleRecordsWritten.toString).getOrElse("") - - val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) - val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") - val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => - if (ms == 0) "" else UIUtils.formatDuration(ms) - }.getOrElse("") - - val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) - val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = - maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") - - val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) - val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") - val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") - - <tr> - <td>{info.index}</td> - <td>{info.taskId}</td> - <td sorttable_customkey={info.attempt.toString}>{ - if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString - }</td> - <td>{info.status}</td> - <td>{info.taskLocality}</td> - <td>{info.executorId} / {info.host}</td> - <td>{UIUtils.formatDate(new Date(info.launchTime))}</td> - <td sorttable_customkey={duration.toString}> - {formatDuration} - </td> - <td sorttable_customkey={schedulerDelay.toString} - class={TaskDetailsClassNames.SCHEDULER_DELAY}> - {UIUtils.formatDuration(schedulerDelay.toLong)} - </td> - <td sorttable_customkey={taskDeserializationTime.toString} - class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}> - {UIUtils.formatDuration(taskDeserializationTime.toLong)} - </td> - <td sorttable_customkey={gcTime.toString}> - {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} - </td> - <td sorttable_customkey={serializationTime.toString} - class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}> - {UIUtils.formatDuration(serializationTime)} - </td> - <td sorttable_customkey={gettingResultTime.toString} - class={TaskDetailsClassNames.GETTING_RESULT_TIME}> - {UIUtils.formatDuration(gettingResultTime)} - </td> - {if (hasAccumulators) { - <td> - {Unparsed(accumulatorsReadable.mkString("<br/>"))} - </td> - }} - {if (hasInput) { - <td sorttable_customkey={inputSortable}> - {s"$inputReadable / $inputRecords"} - </td> - }} - {if (hasOutput) { - <td sorttable_customkey={outputSortable}> - {s"$outputReadable / $outputRecords"} - </td> - }} + private[ui] def getSchedulerDelay( + info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = { + if (info.finished) { + val totalExecutionTime = info.finishTime - info.launchTime + val executorOverhead = (metrics.executorDeserializeTime + + metrics.resultSerializationTime) + math.max( + 0, + totalExecutionTime - metrics.executorRunTime - executorOverhead - + getGettingResultTime(info, currentTime)) + } else { + // The task is still running and the metrics like executorRunTime are not available. + 0L + } + } +} + +private[ui] case class TaskTableRowInputData(inputSortable: Long, inputReadable: String) + +private[ui] case class TaskTableRowOutputData(outputSortable: Long, outputReadable: String) + +private[ui] case class TaskTableRowShuffleReadData( + shuffleReadBlockedTimeSortable: Long, + shuffleReadBlockedTimeReadable: String, + shuffleReadSortable: Long, + shuffleReadReadable: String, + shuffleReadRemoteSortable: Long, + shuffleReadRemoteReadable: String) + +private[ui] case class TaskTableRowShuffleWriteData( + writeTimeSortable: Long, + writeTimeReadable: String, + shuffleWriteSortable: Long, + shuffleWriteReadable: String) + +private[ui] case class TaskTableRowBytesSpilledData( + memoryBytesSpilledSortable: Long, + memoryBytesSpilledReadable: String, + diskBytesSpilledSortable: Long, + diskBytesSpilledReadable: String) + +/** + * Contains all data that needs for sorting and generating HTML. Using this one rather than + * TaskUIData to avoid creating duplicate contents during sorting the data. + */ +private[ui] case class TaskTableRowData( + index: Int, + taskId: Long, + attempt: Int, + speculative: Boolean, + status: String, + taskLocality: String, + executorIdAndHost: String, + launchTime: Long, + duration: Long, + formatDuration: String, + schedulerDelay: Long, + taskDeserializationTime: Long, + gcTime: Long, + serializationTime: Long, + gettingResultTime: Long, + accumulators: Option[String], // HTML + input: Option[TaskTableRowInputData], + output: Option[TaskTableRowOutputData], + shuffleRead: Option[TaskTableRowShuffleReadData], + shuffleWrite: Option[TaskTableRowShuffleWriteData], + bytesSpilled: Option[TaskTableRowBytesSpilledData], + error: String) + +private[ui] class TaskDataSource( + tasks: Seq[TaskUIData], + hasAccumulators: Boolean, + hasInput: Boolean, + hasOutput: Boolean, + hasShuffleRead: Boolean, + hasShuffleWrite: Boolean, + hasBytesSpilled: Boolean, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) { + import StagePage._ + + // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table + // so that we can avoid creating duplicate contents during sorting the data + private val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc)) + + private var _slicedTaskIds: Set[Long] = null + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[TaskTableRowData] = { + val r = data.slice(from, to) + _slicedTaskIds = r.map(_.taskId).toSet + r + } + + def slicedTaskIds: Set[Long] = _slicedTaskIds + + private def taskRow(taskData: TaskUIData): TaskTableRowData = { + val TaskUIData(info, metrics, errorMessage) = taskData + val duration = if (info.status == "RUNNING") info.timeRunning(currentTime) + else metrics.map(_.executorRunTime).getOrElse(1L) + val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) + else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") + val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) + val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) + val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) + val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val gettingResultTime = getGettingResultTime(info, currentTime) + + val maybeAccumulators = info.accumulables + val accumulatorsReadable = maybeAccumulators.map { acc => + StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}") + } + + val maybeInput = metrics.flatMap(_.inputMetrics) + val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L) + val inputReadable = maybeInput + .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") + .getOrElse("") + val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("") + + val maybeOutput = metrics.flatMap(_.outputMetrics) + val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L) + val outputReadable = maybeOutput + .map(m => s"${Utils.bytesToString(m.bytesWritten)}") + .getOrElse("") + val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("") + + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics) + val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L) + val shuffleReadBlockedTimeReadable = + maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("") + + val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead) + val shuffleReadSortable = totalShuffleBytes.getOrElse(0L) + val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("") + val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("") + + val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead) + val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L) + val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("") + + val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics) + val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L) + val shuffleWriteReadable = maybeShuffleWrite + .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("") + val shuffleWriteRecords = maybeShuffleWrite + .map(_.shuffleRecordsWritten.toString).getOrElse("") + + val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) + val writeTimeSortable = maybeWriteTime.getOrElse(0L) + val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => + if (ms == 0) "" else UIUtils.formatDuration(ms) + }.getOrElse("") + + val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) + val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.getOrElse(0L) + val memoryBytesSpilledReadable = + maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") + + val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) + val diskBytesSpilledSortable = maybeDiskBytesSpilled.getOrElse(0L) + val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") + + val input = + if (hasInput) { + Some(TaskTableRowInputData(inputSortable, s"$inputReadable / $inputRecords")) + } else { + None + } + + val output = + if (hasOutput) { + Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / $outputRecords")) + } else { + None + } + + val shuffleRead = + if (hasShuffleRead) { + Some(TaskTableRowShuffleReadData( + shuffleReadBlockedTimeSortable, + shuffleReadBlockedTimeReadable, + shuffleReadSortable, + s"$shuffleReadReadable / $shuffleReadRecords", + shuffleReadRemoteSortable, + shuffleReadRemoteReadable + )) + } else { + None + } + + val shuffleWrite = + if (hasShuffleWrite) { + Some(TaskTableRowShuffleWriteData( + writeTimeSortable, + writeTimeReadable, + shuffleWriteSortable, + s"$shuffleWriteReadable / $shuffleWriteRecords" + )) + } else { + None + } + + val bytesSpilled = + if (hasBytesSpilled) { + Some(TaskTableRowBytesSpilledData( + memoryBytesSpilledSortable, + memoryBytesSpilledReadable, + diskBytesSpilledSortable, + diskBytesSpilledReadable + )) + } else { + None + } + + TaskTableRowData( + info.index, + info.taskId, + info.attempt, + info.speculative, + info.status, + info.taskLocality.toString, + s"${info.executorId} / ${info.host}", + info.launchTime, + duration, + formatDuration, + schedulerDelay, + taskDeserializationTime, + gcTime, + serializationTime, + gettingResultTime, + if (hasAccumulators) Some(accumulatorsReadable.mkString("<br/>")) else None, + input, + output, + shuffleRead, + shuffleWrite, + bytesSpilled, + errorMessage.getOrElse("") + ) + } + + /** + * Return Ordering according to sortColumn and desc + */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = { + val ordering = sortColumn match { + case "Index" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Int.compare(x.index, y.index) + } + case "ID" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.taskId, y.taskId) + } + case "Attempt" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Int.compare(x.attempt, y.attempt) + } + case "Status" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.String.compare(x.status, y.status) + } + case "Locality Level" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.String.compare(x.taskLocality, y.taskLocality) + } + case "Executor ID / Host" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.String.compare(x.executorIdAndHost, y.executorIdAndHost) + } + case "Launch Time" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.launchTime, y.launchTime) + } + case "Duration" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.duration, y.duration) + } + case "Scheduler Delay" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.schedulerDelay, y.schedulerDelay) + } + case "Task Deserialization Time" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.taskDeserializationTime, y.taskDeserializationTime) + } + case "GC Time" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.gcTime, y.gcTime) + } + case "Result Serialization Time" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.serializationTime, y.serializationTime) + } + case "Getting Result Time" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime) + } + case "Accumulators" => + if (hasAccumulators) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.String.compare(x.accumulators.get, y.accumulators.get) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Accumulators because of no accumulators") + } + case "Input Size / Records" => + if (hasInput) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.input.get.inputSortable, y.input.get.inputSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Input Size / Records because of no inputs") + } + case "Output Size / Records" => + if (hasOutput) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.output.get.outputSortable, y.output.get.outputSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Output Size / Records because of no outputs") + } + // ShuffleRead + case "Shuffle Read Blocked Time" => + if (hasShuffleRead) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.shuffleRead.get.shuffleReadBlockedTimeSortable, + y.shuffleRead.get.shuffleReadBlockedTimeSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads") + } + case "Shuffle Read Size / Records" => + if (hasShuffleRead) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.shuffleRead.get.shuffleReadSortable, + y.shuffleRead.get.shuffleReadSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Shuffle Read Size / Records because of no shuffle reads") + } + case "Shuffle Remote Reads" => + if (hasShuffleRead) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.shuffleRead.get.shuffleReadRemoteSortable, + y.shuffleRead.get.shuffleReadRemoteSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Shuffle Remote Reads because of no shuffle reads") + } + // ShuffleWrite + case "Write Time" => + if (hasShuffleWrite) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.shuffleWrite.get.writeTimeSortable, + y.shuffleWrite.get.writeTimeSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Write Time because of no shuffle writes") + } + case "Shuffle Write Size / Records" => + if (hasShuffleWrite) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.shuffleWrite.get.shuffleWriteSortable, + y.shuffleWrite.get.shuffleWriteSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Shuffle Write Size / Records because of no shuffle writes") + } + // BytesSpilled + case "Shuffle Spill (Memory)" => + if (hasBytesSpilled) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.bytesSpilled.get.memoryBytesSpilledSortable, + y.bytesSpilled.get.memoryBytesSpilledSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Shuffle Spill (Memory) because of no spills") + } + case "Shuffle Spill (Disk)" => + if (hasBytesSpilled) { + new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.Long.compare(x.bytesSpilled.get.diskBytesSpilledSortable, + y.bytesSpilled.get.diskBytesSpilledSortable) + } + } else { + throw new IllegalArgumentException( + "Cannot sort by Shuffle Spill (Disk) because of no spills") + } + case "Errors" => new Ordering[TaskTableRowData] { + override def compare(x: TaskTableRowData, y: TaskTableRowData): Int = + Ordering.String.compare(x.error, y.error) + } + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } + +} + +private[ui] class TaskPagedTable( + basePath: String, + data: Seq[TaskUIData], + hasAccumulators: Boolean, + hasInput: Boolean, + hasOutput: Boolean, + hasShuffleRead: Boolean, + hasShuffleWrite: Boolean, + hasBytesSpilled: Boolean, + currentTime: Long, + pageSize: Int, + sortColumn: String, + desc: Boolean) extends PagedTable[TaskTableRowData]{ + + override def tableId: String = "" + + override def tableCssClass: String = "table table-bordered table-condensed table-striped" + + override val dataSource: TaskDataSource = new TaskDataSource( + data, + hasAccumulators, + hasInput, + hasOutput, + hasShuffleRead, + hasShuffleWrite, + hasBytesSpilled, + currentTime, + pageSize, + sortColumn, + desc + ) + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + s"${basePath}&task.page=$page&task.sort=${encodedSortColumn}&task.desc=${desc}" + + s"&task.pageSize=${pageSize}" + } + + override def goButtonJavascriptFunction: (String, String) = { + val jsFuncName = "goToTaskPage" + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + val jsFunc = s""" + |currentTaskPageSize = ${pageSize} + |function goToTaskPage(page, pageSize) { + | // Set page to 1 if the page size changes + | page = pageSize == currentTaskPageSize ? page : 1; + | var url = "${basePath}&task.sort=${encodedSortColumn}&task.desc=${desc}" + + | "&task.page=" + page + "&task.pageSize=" + pageSize; + | window.location.href = url; + |} + """.stripMargin + (jsFuncName, jsFunc) + } + + def headers: Seq[Node] = { + val taskHeadersAndCssClasses: Seq[(String, String)] = + Seq( + ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""), + ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""), + ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY), + ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), + ("GC Time", ""), + ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), + ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ + {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ + {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++ + {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ {if (hasShuffleRead) { - <td sorttable_customkey={shuffleReadBlockedTimeSortable} - class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> - {shuffleReadBlockedTimeReadable} - </td> - <td sorttable_customkey={shuffleReadSortable}> - {s"$shuffleReadReadable / $shuffleReadRecords"} - </td> - <td sorttable_customkey={shuffleReadRemoteSortable} - class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}> - {shuffleReadRemoteReadable} - </td> - }} + Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), + ("Shuffle Read Size / Records", ""), + ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE)) + } else { + Nil + }} ++ {if (hasShuffleWrite) { - <td sorttable_customkey={writeTimeSortable}> - {writeTimeReadable} - </td> - <td sorttable_customkey={shuffleWriteSortable}> - {s"$shuffleWriteReadable / $shuffleWriteRecords"} - </td> - }} + Seq(("Write Time", ""), ("Shuffle Write Size / Records", "")) + } else { + Nil + }} ++ {if (hasBytesSpilled) { - <td sorttable_customkey={memoryBytesSpilledSortable}> - {memoryBytesSpilledReadable} - </td> - <td sorttable_customkey={diskBytesSpilledSortable}> - {diskBytesSpilledReadable} - </td> - }} - {errorMessageCell(errorMessage)} - </tr> + Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) + } else { + Nil + }} ++ + Seq(("Errors", "")) + + if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) { + new IllegalArgumentException(s"Unknown column: $sortColumn") } + + val headerRow: Seq[Node] = { + taskHeadersAndCssClasses.map { case (header, cssClass) => + if (header == sortColumn) { + val headerLink = + s"$basePath&task.sort=${URLEncoder.encode(header, "UTF-8")}&task.desc=${!desc}" + + s"&task.pageSize=${pageSize}" + val js = Unparsed(s"window.location.href='${headerLink}'") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + <th class={cssClass} onclick={js} style="cursor: pointer;"> + {header} + <span> {Unparsed(arrow)}</span> + </th> + } else { + val headerLink = + s"$basePath&task.sort=${URLEncoder.encode(header, "UTF-8")}&task.pageSize=${pageSize}" + val js = Unparsed(s"window.location.href='${headerLink}'") + <th class={cssClass} onclick={js} style="cursor: pointer;"> + {header} + </th> + } + } + } + <thead>{headerRow}</thead> + } + + def row(task: TaskTableRowData): Seq[Node] = { + <tr> + <td>{task.index}</td> + <td>{task.taskId}</td> + <td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td> + <td>{task.status}</td> + <td>{task.taskLocality}</td> + <td>{task.executorIdAndHost}</td> + <td>{UIUtils.formatDate(new Date(task.launchTime))}</td> + <td>{task.formatDuration}</td> + <td class={TaskDetailsClassNames.SCHEDULER_DELAY}> + {UIUtils.formatDuration(task.schedulerDelay)} + </td> + <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}> + {UIUtils.formatDuration(task.taskDeserializationTime)} + </td> + <td> + {if (task.gcTime > 0) UIUtils.formatDuration(task.gcTime) else ""} + </td> + <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}> + {UIUtils.formatDuration(task.serializationTime)} + </td> + <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}> + {UIUtils.formatDuration(task.gettingResultTime)} + </td> + {if (task.accumulators.nonEmpty) { + <td>{Unparsed(task.accumulators.get)}</td> + }} + {if (task.input.nonEmpty) { + <td>{task.input.get.inputReadable}</td> + }} + {if (task.output.nonEmpty) { + <td>{task.output.get.outputReadable}</td> + }} + {if (task.shuffleRead.nonEmpty) { + <td class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> + {task.shuffleRead.get.shuffleReadBlockedTimeReadable} + </td> + <td>{task.shuffleRead.get.shuffleReadReadable}</td> + <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}> + {task.shuffleRead.get.shuffleReadRemoteReadable} + </td> + }} + {if (task.shuffleWrite.nonEmpty) { + <td>{task.shuffleWrite.get.writeTimeReadable}</td> + <td>{task.shuffleWrite.get.shuffleWriteReadable}</td> + }} + {if (task.bytesSpilled.nonEmpty) { + <td>{task.bytesSpilled.get.memoryBytesSpilledReadable}</td> + <td>{task.bytesSpilled.get.diskBytesSpilledReadable}</td> + }} + {errorMessageCell(task.error)} + </tr> } - private def errorMessageCell(errorMessage: Option[String]): Seq[Node] = { - val error = errorMessage.getOrElse("") + private def errorMessageCell(error: String): Seq[Node] = { val isMultiline = error.indexOf('\n') >= 0 // Display the first line by default val errorSummary = StringEscapeUtils.escapeHtml4( @@ -860,32 +1325,4 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } <td>{errorSummary}{details}</td> } - - private def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = { - if (info.gettingResult) { - if (info.finished) { - info.finishTime - info.gettingResultTime - } else { - // The task is still fetching the result. - currentTime - info.gettingResultTime - } - } else { - 0L - } - } - - private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = { - if (info.finished) { - val totalExecutionTime = info.finishTime - info.launchTime - val executorOverhead = (metrics.executorDeserializeTime + - metrics.resultSerializationTime) - math.max( - 0, - totalExecutionTime - metrics.executorRunTime - executorOverhead - - getGettingResultTime(info, currentTime)) - } else { - // The task is still running and the metrics like executorRunTime are not available. - 0L - } - } } |