aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-12-18 22:40:44 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-18 22:40:55 -0800
commitfd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4 (patch)
treedaba2d44b830d0fcb4c07590562247be9d11cd8b /core
parentca37639aa1b537d0f9b56bf1362bf293635e235c (diff)
downloadspark-fd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4.tar.gz
spark-fd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4.tar.bz2
spark-fd7bb9d9728fa2b4fc6f26ae6a31cfa60d560ad4.zip
SPARK-3428. TaskMetrics for running tasks is missing GC time metrics
Author: Sandy Ryza <sandy@cloudera.com> Closes #3684 from sryza/sandy-spark-3428 and squashes the following commits: cb827fe [Sandy Ryza] SPARK-3428. TaskMetrics for running tasks is missing GC time metrics (cherry picked from commit 283263ffaa941e7e9ba147cf0ad377d9202d3761) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 52de6980ec..da030f231f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -145,6 +145,8 @@ private[spark] class Executor(
}
}
+ private def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
+
class TaskRunner(
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
extends Runnable {
@@ -152,6 +154,7 @@ private[spark] class Executor(
@volatile private var killed = false
@volatile var task: Task[Any] = _
@volatile var attemptedTask: Option[Task[Any]] = None
+ @volatile var startGCTime: Long = _
def kill(interruptThread: Boolean) {
logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
@@ -168,8 +171,7 @@ private[spark] class Executor(
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
- def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
- val startGCTime = gcTime
+ startGCTime = gcTime
try {
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
@@ -376,10 +378,13 @@ private[spark] class Executor(
while (!isStopped) {
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
+ val curGCTime = gcTime
+
for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
+ metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see