diff options
author | Tor Myklebust <tmyklebu@csclub.uwaterloo.ca> | 2013-12-16 12:07:39 -0500 |
---|---|---|
committer | Tor Myklebust <tmyklebu@csclub.uwaterloo.ca> | 2013-12-16 12:07:39 -0500 |
commit | 8a397a959bf0b68f7d10fa57665225e0c2b5d03a (patch) | |
tree | 409ea16382dd7cc3b3e26dfe31d88de1c9c7f9cf /core | |
parent | a51f3404ad8711f5fe66381122c5fa1ead09b3da (diff) | |
download | spark-8a397a959bf0b68f7d10fa57665225e0c2b5d03a.tar.gz spark-8a397a959bf0b68f7d10fa57665225e0c2b5d03a.tar.bz2 spark-8a397a959bf0b68f7d10fa57665225e0c2b5d03a.zip |
Track task value serialisation time in TaskMetrics.
Diffstat (limited to 'core')
4 files changed, 26 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0b0a60ee60..02ad64d070 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -222,18 +222,22 @@ private[spark] class Executor( return } + val objectSer = SparkEnv.get.serializer.newInstance() + val beforeSerialization = System.currentTimeMillis() + val valueBytes = objectSer.serialize(value) + val afterSerialization = System.currentTimeMillis() + for (m <- task.metrics) { m.hostname = Utils.localHostName() m.executorDeserializeTime = (taskStart - startTime).toInt m.executorRunTime = (taskFinish - taskStart).toInt m.jvmGCTime = gcTime - startGCTime + m.serializationTime = (afterSerialization - beforeSerialization).toInt } - // TODO I'd also like to track the time it takes to serialize the task results, but that is - // huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a - // custom serialized format, we could just change the relevants bytes in the byte buffer + val accumUpdates = Accumulators.values - val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null)) + val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index c0ce46e379..c036866afd 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -44,6 +44,11 @@ class TaskMetrics extends Serializable { var jvmGCTime: Long = _ /** + * Amount of time spent serializing the result of the task + */ + var serializationTime: Long = _ + + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ var shuffleReadMetrics: Option[ShuffleReadMetrics] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 7e468d0d67..4e00bc8271 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] -class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) +class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) extends TaskResult[T] with Externalizable { - def this() = this(null.asInstanceOf[T], null, null) + def this() = this(null.asInstanceOf[ByteBuffer], null, null) override def writeExternal(out: ObjectOutput) { - val objectSer = SparkEnv.get.serializer.newInstance() - val bb = objectSer.serialize(value) - - out.writeInt(bb.remaining()) - Utils.writeByteBuffer(bb, out) + out.writeInt(valueBytes.remaining); + Utils.writeByteBuffer(valueBytes, out) out.writeInt(accumUpdates.size) for ((key, value) <- accumUpdates) { @@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me override def readExternal(in: ObjectInput) { - val objectSer = SparkEnv.get.serializer.newInstance() - val blen = in.readInt() val byteVal = new Array[Byte](blen) in.readFully(byteVal) - value = objectSer.deserialize(ByteBuffer.wrap(byteVal)) + valueBytes = ByteBuffer.wrap(byteVal) val numUpdates = in.readInt if (numUpdates == 0) { @@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me } metrics = in.readObject().asInstanceOf[TaskMetrics] } + + def value(): T = { + val objectSer = SparkEnv.get.serializer.newInstance() + return objectSer.deserialize(valueBytes) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index b97f2b19b5..788cbb81bf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -313,6 +313,8 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo } def createTaskResult(id: Int): DirectTaskResult[Int] = { - new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics) + val objectSer = SparkEnv.get.serializer.newInstance() + new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, + new TaskMetrics) } } |