aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala1
4 files changed, 36 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index abc1dd0be6..96114571d6 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -161,7 +161,7 @@ private[spark] class Executor(
}
override def run() {
- val startTime = System.currentTimeMillis()
+ val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
@@ -206,7 +206,7 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
- m.executorDeserializeTime = taskStart - startTime
+ m.executorDeserializeTime = taskStart - deserializeStartTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index f02904df31..51dc08f668 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -24,6 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""
+ val TASK_DESERIALIZATION_TIME =
+ """Time spent deserializating the task closure on the executor."""
+
val INPUT = "Bytes read from Hadoop or from Spark storage."
val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 7cc03b7d33..63ed5fc494 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -114,6 +114,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</li>
<li>
<span data-toggle="tooltip"
+ title={ToolTips.TASK_DESERIALIZATION_TIME} data-placement="right">
+ <input type="checkbox" name={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}/>
+ <span class="additional-metric-title">Task Deserialization Time</span>
+ </span>
+ </li>
+ <li>
+ <span data-toggle="tooltip"
title={ToolTips.GC_TIME} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.GC_TIME}/>
<span class="additional-metric-title">GC Time</span>
@@ -147,6 +154,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
+ ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", TaskDetailsClassNames.GC_TIME),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
@@ -179,6 +187,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
}
+ val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.executorDeserializeTime.toDouble
+ }
+ val deserializationQuantiles =
+ <td>
+ <span data-toggle="tooltip" title={ToolTips.TASK_DESERIALIZATION_TIME}
+ data-placement="right">
+ Task Deserialization Time
+ </span>
+ </td> +: getFormattedTimeQuantiles(deserializationTimes)
+
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
@@ -266,6 +285,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val listings: Seq[Seq[Node]] = Seq(
<tr>{serviceQuantiles}</tr>,
<tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
+ <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
+ {deserializationQuantiles}
+ </tr>
<tr class={TaskDetailsClassNames.GC_TIME}>{gcQuantiles}</tr>,
<tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
{serializationQuantiles}
@@ -314,6 +336,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val schedulerDelay = metrics.map(getSchedulerDelay(info, _)).getOrElse(0L)
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
+ val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = info.gettingResultTime
@@ -367,6 +390,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
class={TaskDetailsClassNames.SCHEDULER_DELAY}>
{UIUtils.formatDuration(schedulerDelay.toLong)}
</td>
+ <td sorttable_customkey={taskDeserializationTime.toString}
+ class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
+ {UIUtils.formatDuration(taskDeserializationTime.toLong)}
+ </td>
<td sorttable_customkey={gcTime.toString} class={TaskDetailsClassNames.GC_TIME}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
@@ -424,6 +451,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
(info.finishTime - info.launchTime)
}
}
- totalExecutionTime - metrics.executorRunTime
+ val executorOverhead = (metrics.executorDeserializeTime +
+ metrics.resultSerializationTime)
+ totalExecutionTime - metrics.executorRunTime - executorOverhead
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
index 23d672cabd..eb371bd0ea 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
@@ -24,6 +24,7 @@ package org.apache.spark.ui.jobs
private object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val GC_TIME = "gc_time"
+ val TASK_DESERIALIZATION_TIME = "deserialization_time"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}