aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark')
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala10
11 files changed, 82 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 0b494c146f..82d3098e2e 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -31,7 +31,9 @@ private[spark] object InternalAccumulator {
// Names of internal task level metrics
val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime"
+ val EXECUTOR_DESERIALIZE_CPU_TIME = METRICS_PREFIX + "executorDeserializeCpuTime"
val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
+ val EXECUTOR_CPU_TIME = METRICS_PREFIX + "executorCpuTime"
val RESULT_SIZE = METRICS_PREFIX + "resultSize"
val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime"
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 668ec41153..9501dd9cd8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -232,13 +232,18 @@ private[spark] class Executor(
}
override def run(): Unit = {
+ val threadMXBean = ManagementFactory.getThreadMXBean
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
+ val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime
+ } else 0L
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
+ var taskStartCpu: Long = 0
startGCTime = computeTotalGcTime()
try {
@@ -269,6 +274,9 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
+ taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime
+ } else 0L
var threwException = true
val value = try {
val res = task.run(
@@ -302,6 +310,9 @@ private[spark] class Executor(
}
}
val taskFinish = System.currentTimeMillis()
+ val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime
+ } else 0L
// If the task has been killed, let's fail it.
if (task.killed) {
@@ -317,8 +328,12 @@ private[spark] class Executor(
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
task.metrics.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
+ task.metrics.setExecutorDeserializeCpuTime(
+ (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
+ task.metrics.setExecutorCpuTime(
+ (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 52a349919e..2956768c16 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -47,7 +47,9 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, Accumulat
class TaskMetrics private[spark] () extends Serializable {
// Each metric is internally represented as an accumulator
private val _executorDeserializeTime = new LongAccumulator
+ private val _executorDeserializeCpuTime = new LongAccumulator
private val _executorRunTime = new LongAccumulator
+ private val _executorCpuTime = new LongAccumulator
private val _resultSize = new LongAccumulator
private val _jvmGCTime = new LongAccumulator
private val _resultSerializationTime = new LongAccumulator
@@ -62,11 +64,22 @@ class TaskMetrics private[spark] () extends Serializable {
def executorDeserializeTime: Long = _executorDeserializeTime.sum
/**
+ * CPU Time taken on the executor to deserialize this task in nanoseconds.
+ */
+ def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime.sum
+
+ /**
* Time the executor spends actually running the task (including fetching shuffle data).
*/
def executorRunTime: Long = _executorRunTime.sum
/**
+ * CPU Time the executor spends actually running the task
+ * (including fetching shuffle data) in nanoseconds.
+ */
+ def executorCpuTime: Long = _executorCpuTime.sum
+
+ /**
* The number of bytes this task transmitted back to the driver as the TaskResult.
*/
def resultSize: Long = _resultSize.sum
@@ -111,7 +124,10 @@ class TaskMetrics private[spark] () extends Serializable {
// Setters and increment-ers
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
_executorDeserializeTime.setValue(v)
+ private[spark] def setExecutorDeserializeCpuTime(v: Long): Unit =
+ _executorDeserializeCpuTime.setValue(v)
private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v)
+ private[spark] def setExecutorCpuTime(v: Long): Unit = _executorCpuTime.setValue(v)
private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v)
private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
private[spark] def setResultSerializationTime(v: Long): Unit =
@@ -188,7 +204,9 @@ class TaskMetrics private[spark] () extends Serializable {
import InternalAccumulator._
@transient private[spark] lazy val nameToAccums = LinkedHashMap(
EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
+ EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
EXECUTOR_RUN_TIME -> _executorRunTime,
+ EXECUTOR_CPU_TIME -> _executorCpuTime,
RESULT_SIZE -> _resultSize,
JVM_GC_TIME -> _jvmGCTime,
RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 75c6018e21..609f10aee9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.io._
+import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties
@@ -61,11 +62,18 @@ private[spark] class ResultTask[T, U](
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
+ val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
+ val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime
+ } else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
+ _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
+ } else 0L
func(context, rdd.iterator(partition, context))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 84b3e5ba6c..448fe02084 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler
+import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties
@@ -66,11 +67,18 @@ private[spark] class ShuffleMapTask(
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
+ val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
+ val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime
+ } else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
+ _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
+ } else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index ea9dc3988d..48daa344f3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -139,6 +139,7 @@ private[spark] abstract class Task[T](
@volatile @transient private var _killed = false
protected var _executorDeserializeTime: Long = 0
+ protected var _executorDeserializeCpuTime: Long = 0
/**
* Whether the task has been killed.
@@ -149,6 +150,7 @@ private[spark] abstract class Task[T](
* Returns the amount of time spent deserializing the RDD and function to be run.
*/
def executorDeserializeTime: Long = _executorDeserializeTime
+ def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime
/**
* Collect the latest values of accumulators used in this task. If the task failed,
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 7d63a8f734..acb7c23079 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -101,6 +101,7 @@ private[v1] object AllStagesResource {
numCompleteTasks = stageUiData.numCompleteTasks,
numFailedTasks = stageUiData.numFailedTasks,
executorRunTime = stageUiData.executorRunTime,
+ executorCpuTime = stageUiData.executorCpuTime,
submissionTime = stageInfo.submissionTime.map(new Date(_)),
firstTaskLaunchedTime,
completionTime = stageInfo.completionTime.map(new Date(_)),
@@ -220,7 +221,9 @@ private[v1] object AllStagesResource {
new TaskMetricDistributions(
quantiles = quantiles,
executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
+ executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime),
executorRunTime = metricQuantiles(_.executorRunTime),
+ executorCpuTime = metricQuantiles(_.executorCpuTime),
resultSize = metricQuantiles(_.resultSize),
jvmGcTime = metricQuantiles(_.jvmGCTime),
resultSerializationTime = metricQuantiles(_.resultSerializationTime),
@@ -241,7 +244,9 @@ private[v1] object AllStagesResource {
def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
new TaskMetrics(
executorDeserializeTime = internal.executorDeserializeTime,
+ executorDeserializeCpuTime = internal.executorDeserializeCpuTime,
executorRunTime = internal.executorRunTime,
+ executorCpuTime = internal.executorCpuTime,
resultSize = internal.resultSize,
jvmGcTime = internal.jvmGCTime,
resultSerializationTime = internal.resultSerializationTime,
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 32e332a9ad..44a929b310 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -128,6 +128,7 @@ class StageData private[spark](
val numFailedTasks: Int,
val executorRunTime: Long,
+ val executorCpuTime: Long,
val submissionTime: Option[Date],
val firstTaskLaunchedTime: Option[Date],
val completionTime: Option[Date],
@@ -166,7 +167,9 @@ class TaskData private[spark](
class TaskMetrics private[spark](
val executorDeserializeTime: Long,
+ val executorDeserializeCpuTime: Long,
val executorRunTime: Long,
+ val executorCpuTime: Long,
val resultSize: Long,
val jvmGcTime: Long,
val resultSerializationTime: Long,
@@ -202,7 +205,9 @@ class TaskMetricDistributions private[spark](
val quantiles: IndexedSeq[Double],
val executorDeserializeTime: IndexedSeq[Double],
+ val executorDeserializeCpuTime: IndexedSeq[Double],
val executorRunTime: IndexedSeq[Double],
+ val executorCpuTime: IndexedSeq[Double],
val resultSize: IndexedSeq[Double],
val jvmGcTime: IndexedSeq[Double],
val resultSerializationTime: IndexedSeq[Double],
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index d3a4f9d322..83dc5d8745 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -503,6 +503,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val timeDelta =
taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L)
stageData.executorRunTime += timeDelta
+
+ val cpuTimeDelta =
+ taskMetrics.executorCpuTime - oldMetrics.map(_.executorCpuTime).getOrElse(0L)
+ stageData.executorCpuTime += cpuTimeDelta
}
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index c729f03b3c..f4a04609c4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -80,6 +80,7 @@ private[spark] object UIData {
var numKilledTasks: Int = _
var executorRunTime: Long = _
+ var executorCpuTime: Long = _
var inputBytes: Long = _
var inputRecords: Long = _
@@ -137,7 +138,9 @@ private[spark] object UIData {
metrics.map { m =>
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
+ executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
+ executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
@@ -179,7 +182,9 @@ private[spark] object UIData {
case class TaskMetricsUIData(
executorDeserializeTime: Long,
+ executorDeserializeCpuTime: Long,
executorRunTime: Long,
+ executorCpuTime: Long,
resultSize: Long,
jvmGCTime: Long,
resultSerializationTime: Long,
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 41d947c442..f4fa7b4061 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -348,7 +348,9 @@ private[spark] object JsonProtocol {
("Status" -> blockStatusToJson(status))
})
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
+ ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
+ ("Executor CPU Time" -> taskMetrics.executorCpuTime) ~
("Result Size" -> taskMetrics.resultSize) ~
("JVM GC Time" -> taskMetrics.jvmGCTime) ~
("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
@@ -759,7 +761,15 @@ private[spark] object JsonProtocol {
return metrics
}
metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
+ metrics.setExecutorDeserializeCpuTime((json \ "Executor Deserialize CPU Time") match {
+ case JNothing => 0
+ case x => x.extract[Long]
+ })
metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
+ metrics.setExecutorCpuTime((json \ "Executor CPU Time") match {
+ case JNothing => 0
+ case x => x.extract[Long]
+ })
metrics.setResultSize((json \ "Result Size").extract[Long])
metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])