aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-04-23 13:19:03 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2015-04-23 13:19:03 -0700
commit6afde2c7810c363083d0a699b1de02b54c13e6a9 (patch)
treefc7853c43ff12005aff15ed64f6d22b6c2b7ad8a /core
parentc1213e6a92e126ad886d9804cedaf6db3618e602 (diff)
downloadspark-6afde2c7810c363083d0a699b1de02b54c13e6a9.tar.gz
spark-6afde2c7810c363083d0a699b1de02b54c13e6a9.tar.bz2
spark-6afde2c7810c363083d0a699b1de02b54c13e6a9.zip
[SPARK-7058] Include RDD deserialization time in "task deserialization time" metric
The web UI's "task deserialization time" metric is slightly misleading because it does not capture the time taken to deserialize the broadcasted RDD. Author: Josh Rosen <joshrosen@databricks.com> Closes #5635 from JoshRosen/SPARK-7058 and squashes the following commits: ed90f75 [Josh Rosen] Update UI tooltip a3743b4 [Josh Rosen] Update comments. 4f52910 [Josh Rosen] Roll back whitespace change e9cf9f4 [Josh Rosen] Remove unused variable 9f32e55 [Josh Rosen] Expose executorDeserializeTime on Task instead of pushing runtime calculation into Task. 21f5b47 [Josh Rosen] Don't double-count the broadcast deserialization time in task runtime 1752f0e [Josh Rosen] [SPARK-7058] Incorporate RDD deserialization time in task deserialization time metric
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala4
5 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5fc04df5d6..f57e215c3f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -220,8 +220,12 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
- m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
- m.setExecutorRunTime(taskFinish - taskStart)
+ // Deserialization happens in two parts: first, we deserialize a Task object, which
+ // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
+ m.setExecutorDeserializeTime(
+ (taskStart - deserializeStartTime) + task.executorDeserializeTime)
+ // We need to subtract Task.run()'s deserialization time to avoid double-counting
+ m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index e074ce6ebf..c9a1241139 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -53,9 +53,11 @@ private[spark] class ResultTask[T, U](
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
+ val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
+ _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 6c7d00069a..bd3dd23dfe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -56,9 +56,11 @@ private[spark] class ShuffleMapTask(
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
+ val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
+ _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 8b592867ee..b09b19e2ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -87,12 +87,19 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
+ protected var _executorDeserializeTime: Long = 0
+
/**
* Whether the task has been killed.
*/
def killed: Boolean = _killed
/**
+ * Returns the amount of time spent deserializing the RDD and function to be run.
+ */
+ def executorDeserializeTime: Long = _executorDeserializeTime
+
+ /**
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
* be called multiple times.
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index cae6870c2a..24f3236456 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -24,7 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""
- val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
+ val TASK_DESERIALIZATION_TIME =
+ """Time spent deserializing the task closure on the executor, including the time to read the
+ broadcasted task."""
val SHUFFLE_READ_BLOCKED_TIME =
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."