aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala4
5 files changed, 20 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5fc04df5d6..f57e215c3f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -220,8 +220,12 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
- m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
- m.setExecutorRunTime(taskFinish - taskStart)
+ // Deserialization happens in two parts: first, we deserialize a Task object, which
+ // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
+ m.setExecutorDeserializeTime(
+ (taskStart - deserializeStartTime) + task.executorDeserializeTime)
+ // We need to subtract Task.run()'s deserialization time to avoid double-counting
+ m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index e074ce6ebf..c9a1241139 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -53,9 +53,11 @@ private[spark] class ResultTask[T, U](
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
+ val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
+ _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 6c7d00069a..bd3dd23dfe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -56,9 +56,11 @@ private[spark] class ShuffleMapTask(
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
+ val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
+ _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 8b592867ee..b09b19e2ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -87,12 +87,19 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
// initialized when kill() is invoked.
@volatile @transient private var _killed = false
+ protected var _executorDeserializeTime: Long = 0
+
/**
* Whether the task has been killed.
*/
def killed: Boolean = _killed
/**
+ * Returns the amount of time spent deserializing the RDD and function to be run.
+ */
+ def executorDeserializeTime: Long = _executorDeserializeTime
+
+ /**
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
* be called multiple times.
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index cae6870c2a..24f3236456 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -24,7 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""
- val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
+ val TASK_DESERIALIZATION_TIME =
+ """Time spent deserializing the task closure on the executor, including the time to read the
+ broadcasted task."""
val SHUFFLE_READ_BLOCKED_TIME =
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."