diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-12-18 22:40:44 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-18 22:40:55 -0800 |
commit | fd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4 (patch) | |
tree | daba2d44b830d0fcb4c07590562247be9d11cd8b /core | |
parent | ca37639aa1b537d0f9b56bf1362bf293635e235c (diff) | |
download | spark-fd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4.tar.gz spark-fd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4.tar.bz2 spark-fd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4.zip |
SPARK-3428. TaskMetrics for running tasks is missing GC time metrics
Author: Sandy Ryza <sandy@cloudera.com>
Closes #3684 from sryza/sandy-spark-3428 and squashes the following commits:
cb827fe [Sandy Ryza] SPARK-3428. TaskMetrics for running tasks is missing GC time metrics
(cherry picked from commit 283263ffaa941e7e9ba147cf0ad377d9202d3761)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/Executor.scala | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 52de6980ec..da030f231f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -145,6 +145,8 @@ private[spark] class Executor( } } + private def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum + class TaskRunner( execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer) extends Runnable { @@ -152,6 +154,7 @@ private[spark] class Executor( @volatile private var killed = false @volatile var task: Task[Any] = _ @volatile var attemptedTask: Option[Task[Any]] = None + @volatile var startGCTime: Long = _ def kill(interruptThread: Boolean) { logInfo(s"Executor is trying to kill $taskName (TID $taskId)") @@ -168,8 +171,7 @@ private[spark] class Executor( logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 - def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum - val startGCTime = gcTime + startGCTime = gcTime try { val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) @@ -376,10 +378,13 @@ private[spark] class Executor( while (!isStopped) { val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() + val curGCTime = gcTime + for (taskRunner <- runningTasks.values()) { if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => metrics.updateShuffleReadMetrics + metrics.jvmGCTime = curGCTime - taskRunner.startGCTime if (isLocal) { // JobProgressListener will hold an reference of it during // onExecutorMetricsUpdate(), then JobProgressListener can not see |