aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala3
5 files changed, 31 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0b0a60ee60..0f19d7a96b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -222,18 +222,22 @@ private[spark] class Executor(
return
}
+ val resultSer = SparkEnv.get.serializer.newInstance()
+ val beforeSerialization = System.currentTimeMillis()
+ val valueBytes = resultSer.serialize(value)
+ val afterSerialization = System.currentTimeMillis()
+
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = (taskStart - startTime).toInt
m.executorRunTime = (taskFinish - taskStart).toInt
m.jvmGCTime = gcTime - startGCTime
+ m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
}
- // TODO I'd also like to track the time it takes to serialize the task results, but that is
- // huge headache, b/c we need to serialize the task metrics first. If TaskMetrics had a
- // custom serialized format, we could just change the relevants bytes in the byte buffer
+
val accumUpdates = Accumulators.values
- val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index c0ce46e379..bb1471d9ee 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -44,6 +44,11 @@ class TaskMetrics extends Serializable {
var jvmGCTime: Long = _
/**
+ * Amount of time spent serializing the task result
+ */
+ var resultSerializationTime: Long = _
+
+ /**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 7e468d0d67..4e00bc8271 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -35,18 +35,15 @@ case class IndirectTaskResult[T](blockId: BlockId) extends TaskResult[T] with Se
/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
-class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
extends TaskResult[T] with Externalizable {
- def this() = this(null.asInstanceOf[T], null, null)
+ def this() = this(null.asInstanceOf[ByteBuffer], null, null)
override def writeExternal(out: ObjectOutput) {
- val objectSer = SparkEnv.get.serializer.newInstance()
- val bb = objectSer.serialize(value)
-
- out.writeInt(bb.remaining())
- Utils.writeByteBuffer(bb, out)
+ out.writeInt(valueBytes.remaining);
+ Utils.writeByteBuffer(valueBytes, out)
out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) {
@@ -58,12 +55,10 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
override def readExternal(in: ObjectInput) {
- val objectSer = SparkEnv.get.serializer.newInstance()
-
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
- value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+ valueBytes = ByteBuffer.wrap(byteVal)
val numUpdates = in.readInt
if (numUpdates == 0) {
@@ -76,4 +71,9 @@ class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var me
}
metrics = in.readObject().asInstanceOf[TaskMetrics]
}
+
+ def value(): T = {
+ val objectSer = SparkEnv.get.serializer.newInstance()
+ return objectSer.deserialize(valueBytes)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 69f9446bab..2f06efa66e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -101,6 +101,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
None
}
else {
+ val serializationTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.get.resultSerializationTime.toDouble}
+ val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
@@ -149,6 +154,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
val listings: Seq[Seq[String]] = Seq(
+ serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
schedulerDelayQuantiles,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index 29c4cc5d9c..cb719d7ab9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -313,6 +313,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {
- new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
+ val objectSer = SparkEnv.get.serializer.newInstance()
+ new DirectTaskResult[Int](objectSer.serialize(id), mutable.Map.empty, new TaskMetrics)
}
}