diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2015-01-25 16:48:26 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-01-25 16:48:26 -0800 |
commit | fc2168f04e9b2c7ce45f59db0bf632a26d56c72b (patch) | |
tree | 89d53e9f332c13f7744bfe9d1cf5f3121dda5d94 | |
parent | 8f5c827b01026bf45fc774ed7387f11a941abea8 (diff) | |
download | spark-fc2168f04e9b2c7ce45f59db0bf632a26d56c72b.tar.gz spark-fc2168f04e9b2c7ce45f59db0bf632a26d56c72b.tar.bz2 spark-fc2168f04e9b2c7ce45f59db0bf632a26d56c72b.zip |
[SPARK-5326] Show fetch wait time as optional metric in the UI
With this change, here's what the UI looks like:
![image](https://cloud.githubusercontent.com/assets/1108612/5809994/1ec8a904-9ff4-11e4-8f24-6a59a1a858f7.png)
If you want to locally test this, you need to spin up multiple executors, because the shuffle read metrics are only shown for data read remotely.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes #4110 from kayousterhout/SPARK-5326 and squashes the following commits:
610051e [Kay Ousterhout] Josh style comments
5feaa28 [Kay Ousterhout] What is the difference here??
aa129cb [Kay Ousterhout] Removed inadvertent change
721c742 [Kay Ousterhout] Improved tooltip
f3a7111 [Kay Ousterhout] Style fix
679b4e9 [Kay Ousterhout] [SPARK-5326] Show fetch wait time as optional metric in the UI
4 files changed, 45 insertions, 5 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index a1f7133f89..f23ba9dba1 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -190,6 +190,7 @@ span.additional-metric-title { /* Hide all additional metrics by default. This is done here rather than using JavaScript to * avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */ -.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time { +.scheduler_delay, .deserialization_time, .fetch_wait_time, .serialization_time, +.getting_result_time { display: none; } diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 6f446c5a95..4307029d44 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -24,8 +24,10 @@ private[spark] object ToolTips { scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results.""" - val TASK_DESERIALIZATION_TIME = - """Time spent deserializating the task closure on the executor.""" + val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor." + + val SHUFFLE_READ_BLOCKED_TIME = + "Time that the task spent blocked waiting for shuffle data to be read from remote machines." val INPUT = "Bytes read from Hadoop or from Spark storage." diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 09a936c223..d8be1b20b3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -132,6 +132,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { <span class="additional-metric-title">Task Deserialization Time</span> </span> </li> + {if (hasShuffleRead) { + <li> + <span data-toggle="tooltip" + title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right"> + <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}/> + <span class="additional-metric-title">Shuffle Read Blocked Time</span> + </span> + </li> + }} <li> <span data-toggle="tooltip" title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right"> @@ -167,7 +176,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ {if (hasInput) Seq(("Input", "")) else Nil} ++ {if (hasOutput) Seq(("Output", "")) else Nil} ++ - {if (hasShuffleRead) Seq(("Shuffle Read", "")) else Nil} ++ + {if (hasShuffleRead) { + Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), + ("Shuffle Read", "")) + } else { + Nil + }} ++ {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++ {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) else Nil} ++ @@ -271,6 +285,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } val outputQuantiles = <td>Output</td> +: getFormattedSizeQuantiles(outputSizes) + val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble + } + val shuffleReadBlockedQuantiles = <td>Shuffle Read Blocked Time</td> +: + getFormattedTimeQuantiles(shuffleReadBlockedTimes) + val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } @@ -308,7 +328,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>, if (hasInput) <tr>{inputQuantiles}</tr> else Nil, if (hasOutput) <tr>{outputQuantiles}</tr> else Nil, - if (hasShuffleRead) <tr>{shuffleReadQuantiles}</tr> else Nil, + if (hasShuffleRead) { + <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> + {shuffleReadBlockedQuantiles} + </tr> + <tr>{shuffleReadQuantiles}</tr> + } else { + Nil + }, if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil, if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil, if (hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil) @@ -377,6 +404,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { .map(m => s"${Utils.bytesToString(m.bytesWritten)}") .getOrElse("") + val maybeShuffleReadBlockedTime = metrics.flatMap(_.shuffleReadMetrics).map(_.fetchWaitTime) + val shuffleReadBlockedTimeSortable = maybeShuffleReadBlockedTime.map(_.toString).getOrElse("") + val shuffleReadBlockedTimeReadable = + maybeShuffleReadBlockedTime.map(ms => UIUtils.formatDuration(ms)).getOrElse("") + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") @@ -449,6 +481,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { </td> }} {if (hasShuffleRead) { + <td sorttable_customkey={shuffleReadBlockedTimeSortable} + class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> + {shuffleReadBlockedTimeReadable} + </td> <td sorttable_customkey={shuffleReadSortable}> {shuffleReadReadable} </td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala index 2d13bb6ddd..37cf2c207b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala @@ -27,6 +27,7 @@ package org.apache.spark.ui.jobs private[spark] object TaskDetailsClassNames { val SCHEDULER_DELAY = "scheduler_delay" val TASK_DESERIALIZATION_TIME = "deserialization_time" + val SHUFFLE_READ_BLOCKED_TIME = "fetch_wait_time" val RESULT_SERIALIZATION_TIME = "serialization_time" val GETTING_RESULT_TIME = "getting_result_time" } |