aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala8
1 files changed, 8 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 84b3e5ba6c..448fe02084 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler
+import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties
@@ -66,11 +67,18 @@ private[spark] class ShuffleMapTask(
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
+ val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
+ val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime
+ } else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
+ _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
+ threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
+ } else 0L
var writer: ShuffleWriter[Any, Any] = null
try {