aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-01-25 16:48:26 -0800
committerAndrew Or <andrew@databricks.com>2015-01-25 16:48:26 -0800
commitfc2168f04e9b2c7ce45f59db0bf632a26d56c72b (patch)
tree89d53e9f332c13f7744bfe9d1cf5f3121dda5d94
parent8f5c827b01026bf45fc774ed7387f11a941abea8 (diff)
downloadspark-fc2168f04e9b2c7ce45f59db0bf632a26d56c72b.tar.gz
spark-fc2168f04e9b2c7ce45f59db0bf632a26d56c72b.tar.bz2
spark-fc2168f04e9b2c7ce45f59db0bf632a26d56c72b.zip
[SPARK-5326] Show fetch wait time as optional metric in the UI
With this change, here's what the UI looks like: ![image](https://cloud.githubusercontent.com/assets/1108612/5809994/1ec8a904-9ff4-11e4-8f24-6a59a1a858f7.png) If you want to locally test this, you need to spin up multiple executors, because the shuffle read metrics are only shown for data read remotely. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #4110 from kayousterhout/SPARK-5326 and squashes the following commits: 610051e [Kay Ousterhout] Josh style comments 5feaa28 [Kay Ousterhout] What is the difference here?? aa129cb [Kay Ousterhout] Removed inadvertent change 721c742 [Kay Ousterhout] Improved tooltip f3a7111 [Kay Ousterhout] Style fix 679b4e9 [Kay Ousterhout] [SPARK-5326] Show fetch wait time as optional metric in the UI
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/webui.css3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala1
4 files changed, 45 insertions, 5 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index a1f7133f89..f23ba9dba1 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -190,6 +190,7 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
-.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {
+.scheduler_delay, .deserialization_time, .fetch_wait_time, .serialization_time,
+.getting_result_time {
display: none;
}
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index 6f446c5a95..4307029d44 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -24,8 +24,10 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""
- val TASK_DESERIALIZATION_TIME =
- """Time spent deserializating the task closure on the executor."""
+ val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
+
+ val SHUFFLE_READ_BLOCKED_TIME =
+ "Time that the task spent blocked waiting for shuffle data to be read from remote machines."
val INPUT = "Bytes read from Hadoop or from Spark storage."
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 09a936c223..d8be1b20b3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -132,6 +132,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Task Deserialization Time</span>
</span>
</li>
+ {if (hasShuffleRead) {
+ <li>
+ <span data-toggle="tooltip"
+ title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
+ <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}/>
+ <span class="additional-metric-title">Shuffle Read Blocked Time</span>
+ </span>
+ </li>
+ }}
<li>
<span data-toggle="tooltip"
title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right">
@@ -167,7 +176,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
{if (hasInput) Seq(("Input", "")) else Nil} ++
{if (hasOutput) Seq(("Output", "")) else Nil} ++
- {if (hasShuffleRead) Seq(("Shuffle Read", "")) else Nil} ++
+ {if (hasShuffleRead) {
+ Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
+ ("Shuffle Read", ""))
+ } else {
+ Nil
+ }} ++
{if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++
{if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
else Nil} ++
@@ -271,6 +285,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}
val outputQuantiles = <td>Output</td> +: getFormattedSizeQuantiles(outputSizes)
+ val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
+ }
+ val shuffleReadBlockedQuantiles = <td>Shuffle Read Blocked Time</td> +:
+ getFormattedTimeQuantiles(shuffleReadBlockedTimes)
+
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
@@ -308,7 +328,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
if (hasInput) <tr>{inputQuantiles}</tr> else Nil,
if (hasOutput) <tr>{outputQuantiles}</tr> else Nil,
- if (hasShuffleRead) <tr>{shuffleReadQuantiles}</tr> else Nil,
+ if (hasShuffleRead) {
+ <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
+ {shuffleReadBlockedQuantiles}
+ </tr>
+ <tr>{shuffleReadQuantiles}</tr>
+ } else {
+ Nil
+ },
if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil,
if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
if (hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil)
@@ -377,6 +404,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
.map(m => s"${Utils.bytesToString(m.bytesWritten)}")
.getOrElse("")
+ val maybeShuffleReadBlockedTime = metrics.flatMap(_.shuffleReadMetrics).map(_.fetchWaitTime)
+ val shuffleReadBlockedTimeSortable = maybeShuffleReadBlockedTime.map(_.toString).getOrElse("")
+ val shuffleReadBlockedTimeReadable =
+ maybeShuffleReadBlockedTime.map(ms => UIUtils.formatDuration(ms)).getOrElse("")
+
val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -449,6 +481,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td>
}}
{if (hasShuffleRead) {
+ <td sorttable_customkey={shuffleReadBlockedTimeSortable}
+ class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
+ {shuffleReadBlockedTimeReadable}
+ </td>
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
index 2d13bb6ddd..37cf2c207b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
@@ -27,6 +27,7 @@ package org.apache.spark.ui.jobs
private[spark] object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val TASK_DESERIALIZATION_TIME = "deserialization_time"
+ val SHUFFLE_READ_BLOCKED_TIME = "fetch_wait_time"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}