diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark')
11 files changed, 82 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 0b494c146f..82d3098e2e 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -31,7 +31,9 @@ private[spark] object InternalAccumulator { // Names of internal task level metrics val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime" + val EXECUTOR_DESERIALIZE_CPU_TIME = METRICS_PREFIX + "executorDeserializeCpuTime" val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime" + val EXECUTOR_CPU_TIME = METRICS_PREFIX + "executorCpuTime" val RESULT_SIZE = METRICS_PREFIX + "resultSize" val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime" val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime" diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 668ec41153..9501dd9cd8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -232,13 +232,18 @@ private[spark] class Executor( } override def run(): Unit = { + val threadMXBean = ManagementFactory.getThreadMXBean val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() + val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime + } else 0L Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 + var taskStartCpu: Long = 0 startGCTime = computeTotalGcTime() try { @@ -269,6 +274,9 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() + taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime + } else 0L var threwException = true val value = try { val res = task.run( @@ -302,6 +310,9 @@ private[spark] class Executor( } } val taskFinish = System.currentTimeMillis() + val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime + } else 0L // If the task has been killed, let's fail it. if (task.killed) { @@ -317,8 +328,12 @@ private[spark] class Executor( // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. task.metrics.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) + task.metrics.setExecutorDeserializeCpuTime( + (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime) // We need to subtract Task.run()'s deserialization time to avoid double-counting task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) + task.metrics.setExecutorCpuTime( + (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 52a349919e..2956768c16 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -47,7 +47,9 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, Accumulat class TaskMetrics private[spark] () extends Serializable { // Each metric is internally represented as an accumulator private val _executorDeserializeTime = new LongAccumulator + private val _executorDeserializeCpuTime = new LongAccumulator private val _executorRunTime = new LongAccumulator + private val _executorCpuTime = new LongAccumulator private val _resultSize = new LongAccumulator private val _jvmGCTime = new LongAccumulator private val _resultSerializationTime = new LongAccumulator @@ -62,11 +64,22 @@ class TaskMetrics private[spark] () extends Serializable { def executorDeserializeTime: Long = _executorDeserializeTime.sum /** + * CPU Time taken on the executor to deserialize this task in nanoseconds. + */ + def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime.sum + + /** * Time the executor spends actually running the task (including fetching shuffle data). */ def executorRunTime: Long = _executorRunTime.sum /** + * CPU Time the executor spends actually running the task + * (including fetching shuffle data) in nanoseconds. + */ + def executorCpuTime: Long = _executorCpuTime.sum + + /** * The number of bytes this task transmitted back to the driver as the TaskResult. */ def resultSize: Long = _resultSize.sum @@ -111,7 +124,10 @@ class TaskMetrics private[spark] () extends Serializable { // Setters and increment-ers private[spark] def setExecutorDeserializeTime(v: Long): Unit = _executorDeserializeTime.setValue(v) + private[spark] def setExecutorDeserializeCpuTime(v: Long): Unit = + _executorDeserializeCpuTime.setValue(v) private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v) + private[spark] def setExecutorCpuTime(v: Long): Unit = _executorCpuTime.setValue(v) private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v) private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v) private[spark] def setResultSerializationTime(v: Long): Unit = @@ -188,7 +204,9 @@ class TaskMetrics private[spark] () extends Serializable { import InternalAccumulator._ @transient private[spark] lazy val nameToAccums = LinkedHashMap( EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime, + EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime, EXECUTOR_RUN_TIME -> _executorRunTime, + EXECUTOR_CPU_TIME -> _executorCpuTime, RESULT_SIZE -> _resultSize, JVM_GC_TIME -> _jvmGCTime, RESULT_SERIALIZATION_TIME -> _resultSerializationTime, diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 75c6018e21..609f10aee9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.io._ +import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.Properties @@ -61,11 +62,18 @@ private[spark] class ResultTask[T, U]( override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. + val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() + val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime + } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime + _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime + } else 0L func(context, rdd.iterator(partition, context)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 84b3e5ba6c..448fe02084 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.Properties @@ -66,11 +67,18 @@ private[spark] class ShuffleMapTask( override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. + val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() + val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime + } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime + _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { + threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime + } else 0L var writer: ShuffleWriter[Any, Any] = null try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index ea9dc3988d..48daa344f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -139,6 +139,7 @@ private[spark] abstract class Task[T]( @volatile @transient private var _killed = false protected var _executorDeserializeTime: Long = 0 + protected var _executorDeserializeCpuTime: Long = 0 /** * Whether the task has been killed. @@ -149,6 +150,7 @@ private[spark] abstract class Task[T]( * Returns the amount of time spent deserializing the RDD and function to be run. */ def executorDeserializeTime: Long = _executorDeserializeTime + def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime /** * Collect the latest values of accumulators used in this task. If the task failed, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 7d63a8f734..acb7c23079 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -101,6 +101,7 @@ private[v1] object AllStagesResource { numCompleteTasks = stageUiData.numCompleteTasks, numFailedTasks = stageUiData.numFailedTasks, executorRunTime = stageUiData.executorRunTime, + executorCpuTime = stageUiData.executorCpuTime, submissionTime = stageInfo.submissionTime.map(new Date(_)), firstTaskLaunchedTime, completionTime = stageInfo.completionTime.map(new Date(_)), @@ -220,7 +221,9 @@ private[v1] object AllStagesResource { new TaskMetricDistributions( quantiles = quantiles, executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), + executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime), executorRunTime = metricQuantiles(_.executorRunTime), + executorCpuTime = metricQuantiles(_.executorCpuTime), resultSize = metricQuantiles(_.resultSize), jvmGcTime = metricQuantiles(_.jvmGCTime), resultSerializationTime = metricQuantiles(_.resultSerializationTime), @@ -241,7 +244,9 @@ private[v1] object AllStagesResource { def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = { new TaskMetrics( executorDeserializeTime = internal.executorDeserializeTime, + executorDeserializeCpuTime = internal.executorDeserializeCpuTime, executorRunTime = internal.executorRunTime, + executorCpuTime = internal.executorCpuTime, resultSize = internal.resultSize, jvmGcTime = internal.jvmGCTime, resultSerializationTime = internal.resultSerializationTime, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 32e332a9ad..44a929b310 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -128,6 +128,7 @@ class StageData private[spark]( val numFailedTasks: Int, val executorRunTime: Long, + val executorCpuTime: Long, val submissionTime: Option[Date], val firstTaskLaunchedTime: Option[Date], val completionTime: Option[Date], @@ -166,7 +167,9 @@ class TaskData private[spark]( class TaskMetrics private[spark]( val executorDeserializeTime: Long, + val executorDeserializeCpuTime: Long, val executorRunTime: Long, + val executorCpuTime: Long, val resultSize: Long, val jvmGcTime: Long, val resultSerializationTime: Long, @@ -202,7 +205,9 @@ class TaskMetricDistributions private[spark]( val quantiles: IndexedSeq[Double], val executorDeserializeTime: IndexedSeq[Double], + val executorDeserializeCpuTime: IndexedSeq[Double], val executorRunTime: IndexedSeq[Double], + val executorCpuTime: IndexedSeq[Double], val resultSize: IndexedSeq[Double], val jvmGcTime: IndexedSeq[Double], val resultSerializationTime: IndexedSeq[Double], diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index d3a4f9d322..83dc5d8745 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -503,6 +503,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { val timeDelta = taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L) stageData.executorRunTime += timeDelta + + val cpuTimeDelta = + taskMetrics.executorCpuTime - oldMetrics.map(_.executorCpuTime).getOrElse(0L) + stageData.executorCpuTime += cpuTimeDelta } override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index c729f03b3c..f4a04609c4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -80,6 +80,7 @@ private[spark] object UIData { var numKilledTasks: Int = _ var executorRunTime: Long = _ + var executorCpuTime: Long = _ var inputBytes: Long = _ var inputRecords: Long = _ @@ -137,7 +138,9 @@ private[spark] object UIData { metrics.map { m => TaskMetricsUIData( executorDeserializeTime = m.executorDeserializeTime, + executorDeserializeCpuTime = m.executorDeserializeCpuTime, executorRunTime = m.executorRunTime, + executorCpuTime = m.executorCpuTime, resultSize = m.resultSize, jvmGCTime = m.jvmGCTime, resultSerializationTime = m.resultSerializationTime, @@ -179,7 +182,9 @@ private[spark] object UIData { case class TaskMetricsUIData( executorDeserializeTime: Long, + executorDeserializeCpuTime: Long, executorRunTime: Long, + executorCpuTime: Long, resultSize: Long, jvmGCTime: Long, resultSerializationTime: Long, diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 41d947c442..f4fa7b4061 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -348,7 +348,9 @@ private[spark] object JsonProtocol { ("Status" -> blockStatusToJson(status)) }) ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ + ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ + ("Executor CPU Time" -> taskMetrics.executorCpuTime) ~ ("Result Size" -> taskMetrics.resultSize) ~ ("JVM GC Time" -> taskMetrics.jvmGCTime) ~ ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~ @@ -759,7 +761,15 @@ private[spark] object JsonProtocol { return metrics } metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) + metrics.setExecutorDeserializeCpuTime((json \ "Executor Deserialize CPU Time") match { + case JNothing => 0 + case x => x.extract[Long] + }) metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long]) + metrics.setExecutorCpuTime((json \ "Executor CPU Time") match { + case JNothing => 0 + case x => x.extract[Long] + }) metrics.setResultSize((json \ "Result Size").extract[Long]) metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long]) metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long]) |