aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala96
1 files changed, 79 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 163a3746ea..8dcfeacb60 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -40,7 +40,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val stageId = request.getParameter("id").toInt
val now = System.currentTimeMillis()
- if (!listener.stageToTaskInfos.contains(stageId)) {
+ if (!listener.stageIdToTaskInfos.contains(stageId)) {
val content =
<div>
<h4>Summary Metrics</h4> No tasks have started yet
@@ -49,23 +49,25 @@ private[spark] class StagePage(parent: JobProgressUI) {
return headerSparkPage(content, parent.sc, "Details for Stage %s".format(stageId), Stages)
}
- val tasks = listener.stageToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
+ val tasks = listener.stageIdToTaskInfos(stageId).toSeq.sortBy(_._1.launchTime)
val numCompleted = tasks.count(_._1.finished)
- val shuffleReadBytes = listener.stageToShuffleRead.getOrElse(stageId, 0L)
+ val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
val hasShuffleRead = shuffleReadBytes > 0
- val shuffleWriteBytes = listener.stageToShuffleWrite.getOrElse(stageId, 0L)
+ val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
val hasShuffleWrite = shuffleWriteBytes > 0
var activeTime = 0L
- listener.stageToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
+ listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
+
+ val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished)
val summary =
<div>
<ul class="unstyled">
<li>
- <strong>CPU time: </strong>
- {parent.formatDuration(listener.stageToTime.getOrElse(stageId, 0L) + activeTime)}
+ <strong>Total task time across all tasks: </strong>
+ {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
<li>
@@ -83,10 +85,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
</div>
val taskHeaders: Seq[String] =
- Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
- Seq("GC Time") ++
+ Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
+ Seq("Duration", "GC Time", "Result Ser Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
+ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors")
val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
@@ -99,11 +101,43 @@ private[spark] class StagePage(parent: JobProgressUI) {
None
}
else {
+ val serializationTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.get.resultSerializationTime.toDouble}
+ val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
+ val gettingResultTimes = validTasks.map{case (info, metrics, exception) =>
+ if (info.gettingResultTime > 0) {
+ (info.finishTime - info.gettingResultTime).toDouble
+ } else {
+ 0.0
+ }
+ }
+ val gettingResultQuantiles = ("Time spent fetching task results" +:
+ Distribution(gettingResultTimes).get.getQuantiles().map(
+ millis => parent.formatDuration(millis.toLong)))
+ // The scheduler delay includes the network delay to send the task to the worker
+ // machine and to send back the result (but not the time to fetch the task result,
+ // if it needed to be fetched from the block manager on the worker).
+ val schedulerDelays = validTasks.map{case (info, metrics, exception) =>
+ val totalExecutionTime = {
+ if (info.gettingResultTime > 0) {
+ (info.gettingResultTime - info.launchTime).toDouble
+ } else {
+ (info.finishTime - info.launchTime).toDouble
+ }
+ }
+ totalExecutionTime - metrics.get.executorRunTime
+ }
+ val schedulerDelayQuantiles = ("Scheduler delay" +:
+ Distribution(schedulerDelays).get.getQuantiles().map(
+ millis => parent.formatDuration(millis.toLong)))
+
def getQuantileCols(data: Seq[Double]) =
Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
@@ -119,7 +153,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
- val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+ val listings: Seq[Seq[String]] = Seq(
+ serializationQuantiles,
+ serviceQuantiles,
+ gettingResultQuantiles,
+ schedulerDelayQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
@@ -128,12 +166,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
-
+ val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
- <h4>Tasks</h4> ++ taskTable;
+ <h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
+ <h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
}
@@ -151,8 +190,23 @@ private[spark] class StagePage(parent: JobProgressUI) {
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
+ val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)
+
+ val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
+ val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
+ val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("")
+
+ val maybeShuffleWrite = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
+ val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
+ val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("")
+
+ val maybeWriteTime = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleWriteTime}
+ val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
+ val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
+ if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")
<tr>
+ <td>{info.index}</td>
<td>{info.taskId}</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
@@ -164,13 +218,21 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) parent.formatDuration(gcTime) else ""}
</td>
+ <td sorttable_customkey={serializationTime.toString}>
+ {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+ </td>
{if (shuffleRead) {
- <td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
- Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
+ <td sorttable_customkey={shuffleReadSortable}>
+ {shuffleReadReadable}
+ </td>
}}
{if (shuffleWrite) {
- <td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
- Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
+ <td sorttable_customkey={writeTimeSortable}>
+ {writeTimeReadable}
+ </td>
+ <td sorttable_customkey={shuffleWriteSortable}>
+ {shuffleWriteReadable}
+ </td>
}}
<td>{exception.map(e =>
<span>