aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@cs.berkeley.edu>2016-12-19 14:53:01 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2016-12-19 14:53:01 -0800
commit4cb49412d1d7d10ffcc738475928c7de2bc59fd4 (patch)
tree4068db1b1ff9223bf3178ed0db07770f87cabefa /core
parent70d495dcecce8617b7099fc599fe7c43d7eae66e (diff)
downloadspark-4cb49412d1d7d10ffcc738475928c7de2bc59fd4.tar.gz
spark-4cb49412d1d7d10ffcc738475928c7de2bc59fd4.tar.bz2
spark-4cb49412d1d7d10ffcc738475928c7de2bc59fd4.zip
[SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler
## What changes were proposed in this pull request? Right now we serialize the empty task metrics once per task – Since this is shared across all tasks we could use the same serialized task metrics across all tasks of a stage. ## How was this patch tested? - [x] Run tests on EC2 to measure performance improvement Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Closes #16261 from shivaram/task-metrics-one-copy.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala6
7 files changed, 38 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0a1c500d77..6177bafc11 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1009,13 +1009,14 @@ class DAGScheduler(
}
val tasks: Seq[Task[_]] = try {
+ val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
+ taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
@@ -1025,7 +1026,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
+ taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index d19353f2a9..6abdf0fd53 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -42,7 +42,8 @@ import org.apache.spark.rdd.RDD
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
* @param localProperties copy of thread-local properties set by the user on the driver side.
- * @param metrics a `TaskMetrics` that is created at driver side and sent to executor side.
+ * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side
+ * and sent to executor side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
@@ -57,12 +58,12 @@ private[spark] class ResultTask[T, U](
locs: Seq[TaskLocation],
val outputId: Int,
localProperties: Properties,
- metrics: TaskMetrics,
+ serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
- extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
- appId, appAttemptId)
+ extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics,
+ jobId, appId, appAttemptId)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 31011de85b..994b81e062 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -42,8 +42,9 @@ import org.apache.spark.shuffle.ShuffleWriter
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
- * @param metrics a `TaskMetrics` that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
+ * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side
+ * and sent to executor side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
@@ -56,18 +57,18 @@ private[spark] class ShuffleMapTask(
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
- metrics: TaskMetrics,
localProperties: Properties,
+ serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
- extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
- appId, appAttemptId)
+ extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
+ serializedTaskMetrics, jobId, appId, appAttemptId)
with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
def this(partitionId: Int) {
- this(0, 0, null, new Partition { override def index: Int = 0 }, null, null, new Properties)
+ this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null)
}
@transient private val preferredLocs: Seq[TaskLocation] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1554200aea..5becca6c06 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -48,6 +48,8 @@ import org.apache.spark.util._
* @param partitionId index of the number in the RDD
* @param metrics a `TaskMetrics` that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
+ * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side
+ * and sent to executor side.
*
* The parameters below are optional:
* @param jobId id of the job this task belongs to
@@ -58,13 +60,17 @@ private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
- // The default value is only used in tests.
- val metrics: TaskMetrics = TaskMetrics.registered,
@transient var localProperties: Properties = new Properties,
+ // The default value is only used in tests.
+ serializedTaskMetrics: Array[Byte] =
+ SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable {
+ @transient lazy val metrics: TaskMetrics =
+ SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))
+
/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
*
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 683eeeeb6d..742500d87d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -51,9 +51,11 @@ class ExecutorSuite extends SparkFunSuite {
when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
when(mockEnv.closureSerializer).thenReturn(serializer)
+ val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array()
+
val serializedTask =
Task.serializeWithDependencies(
- new FakeTask(0, 0),
+ new FakeTask(0, 0, Nil, fakeTaskMetrics),
HashMap[String, Long](),
HashMap[String, Long](),
serializer.newInstance())
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index a757041299..fe6de2bd98 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -17,12 +17,20 @@
package org.apache.spark.scheduler
+import java.util.Properties
+
+import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
+import org.apache.spark.executor.TaskMetrics
class FakeTask(
stageId: Int,
partitionId: Int,
- prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, partitionId) {
+ prefLocs: Seq[TaskLocation] = Nil,
+ serializedTaskMetrics: Array[Byte] =
+ SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
+ extends Task[Int](stageId, 0, partitionId, new Properties, serializedTaskMetrics) {
+
override def runTask(context: TaskContext): Int = 0
override def preferredLocations: Seq[TaskLocation] = prefLocs
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 9eda79ace1..7004128308 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -62,7 +62,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
val task = new ResultTask[String, String](
- 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics)
+ 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties,
+ closureSerializer.serialize(TaskMetrics.registered).array())
intercept[RuntimeException] {
task.run(0, 0, null)
}
@@ -83,7 +84,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
val task = new ResultTask[String, String](
- 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics)
+ 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties,
+ closureSerializer.serialize(TaskMetrics.registered).array())
intercept[RuntimeException] {
task.run(0, 0, null)
}