aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTor Myklebust <tmyklebu@gmail.com>2013-12-16 23:14:52 -0500
committerTor Myklebust <tmyklebu@gmail.com>2013-12-16 23:14:52 -0500
commit963d6f065a763c2b94529bbd3ac4326e190bb2d7 (patch)
tree90c2724c3f1973c5c85b76e44b7994bdbbd7d58d
parent882d544856c61573cdd6124e921f700d580d170d (diff)
downloadspark-963d6f065a763c2b94529bbd3ac4326e190bb2d7.tar.gz
spark-963d6f065a763c2b94529bbd3ac4326e190bb2d7.tar.bz2
spark-963d6f065a763c2b94529bbd3ac4326e190bb2d7.zip
Incorporate pwendell's code review suggestions.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala3
4 files changed, 8 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 02ad64d070..0f19d7a96b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -222,9 +222,9 @@ private[spark] class Executor(
return
}
- val objectSer = SparkEnv.get.serializer.newInstance()
+ val resultSer = SparkEnv.get.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
- val valueBytes = objectSer.serialize(value)
+ val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
@@ -232,7 +232,7 @@ private[spark] class Executor(
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
- m.serializationTime = (afterSerialization - beforeSerialization).toInt
+ m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
}
val accumUpdates = Accumulators.values
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index c036866afd..bb1471d9ee 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -44,9 +44,9 @@ class TaskMetrics extends Serializable {
var jvmGCTime: Long = _
/**
- * Amount of time spent serializing the result of the task
+ * Amount of time spent serializing the task result
*/
- var serializationTime: Long = _
+ var resultSerializationTime: Long = _
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 81651bdd20..2f06efa66e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -102,8 +102,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
else {
val serializationTimes = validTasks.map{case (info, metrics, exception) =>
- metrics.get.serializationTime.toDouble}
- val serializationQuantiles = "Serialization Time" +: Distribution(serializationTimes).get.getQuantiles().map(
+ metrics.get.resultSerializationTime.toDouble}
+ val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 788cbb81bf..2476ab5c19 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -314,7 +314,6 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
def createTaskResult(id: Int): DirectTaskResult[Int] = {
val objectSer = SparkEnv.get.serializer.newInstance()
- new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty,
- new TaskMetrics)
+ new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, new TaskMetrics)
}
}