From 963d6f065a763c2b94529bbd3ac4326e190bb2d7 Mon Sep 17 00:00:00 2001 From: Tor Myklebust Date: Mon, 16 Dec 2013 23:14:52 -0500 Subject: Incorporate pwendell's code review suggestions. --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 6 +++--- core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 4 ++-- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 ++-- .../apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala | 3 +-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 02ad64d070..0f19d7a96b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -222,9 +222,9 @@ private[spark] class Executor( return } - val objectSer = SparkEnv.get.serializer.newInstance() + val resultSer = SparkEnv.get.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() - val valueBytes = objectSer.serialize(value) + val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { @@ -232,7 +232,7 @@ private[spark] class Executor( m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime - m.serializationTime = (afterSerialization - beforeSerialization).toInt + m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt } val accumUpdates = Accumulators.values diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index c036866afd..bb1471d9ee 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -44,9 +44,9 @@ class TaskMetrics extends Serializable { var jvmGCTime: Long = _ /** - * Amount of time spent serializing the result of the task + * Amount of time spent serializing the task result */ - var serializationTime: Long = _ + var resultSerializationTime: Long = _ /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 81651bdd20..2f06efa66e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -102,8 +102,8 @@ private[spark] class StagePage(parent: JobProgressUI) { } else { val serializationTimes = validTasks.map{case (info, metrics, exception) => - metrics.get.serializationTime.toDouble} - val serializationQuantiles = "Serialization Time" +: Distribution(serializationTimes).get.getQuantiles().map( + metrics.get.resultSerializationTime.toDouble} + val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map( ms => parent.formatDuration(ms.toLong)) val serviceTimes = validTasks.map{case (info, metrics, exception) => diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index 788cbb81bf..2476ab5c19 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -314,7 +314,6 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo def createTaskResult(id: Int): DirectTaskResult[Int] = { val objectSer = SparkEnv.get.serializer.newInstance() - new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, - new TaskMetrics) + new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, new TaskMetrics) } } -- cgit v1.2.3