aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2017-01-06 10:48:00 -0600
committerImran Rashid <irashid@cloudera.com>2017-01-06 10:48:08 -0600
commit2e139eed3194c7b8814ff6cf007d4e8a874c1e4d (patch)
treee175b38ba8df154564e74eb32fe44ee1ae783ea5 /resource-managers
parent4a4c3dc9ca10e52f7981b225ec44e97247986905 (diff)
downloadspark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.tar.gz
spark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.tar.bz2
spark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.zip
[SPARK-17931] Eliminate unnecessary task (de) serialization
In the existing code, there are three layers of serialization involved in sending a task from the scheduler to an executor: - A Task object is serialized - The Task object is copied to a byte buffer that also contains serialized information about any additional JARs, files, and Properties needed for the task to execute. This byte buffer is stored as the member variable serializedTask in the TaskDescription class. - The TaskDescription is serialized (in addition to the serialized task + JARs, the TaskDescription class contains the task ID and other metadata) and sent in a LaunchTask message. While it *is* necessary to have two layers of serialization, so that the JAR, file, and Property info can be deserialized prior to deserializing the Task object, the third layer of deserialization is unnecessary. This commit eliminates a layer of serialization by moving the JARs, files, and Properties into the TaskDescription class. This commit also serializes the Properties manually (by traversing the map), as is done with the JARs and files, which reduces the final serialized size. Unit tests This is a simpler alternative to the approach proposed in #15505. shivaram and I did some benchmarking of this and #15505 on a 20-machine m2.4xlarge EC2 machines (160 cores). We ran ~30 trials of code [1] (a very simple job with 10K tasks per stage) and measured the average time per stage: Before this change: 2490ms With this change: 2345 ms (so ~6% improvement over the baseline) With witgo's approach in #15505: 2046 ms (~18% improvement over baseline) The reason that #15505 has a more significant improvement is that it also moves the serialization from the TaskSchedulerImpl thread to the CoarseGrainedSchedulerBackend thread. I added that functionality on top of this change, and got almost the same improvement [1] as #15505 (average of 2103ms). I think we should decouple these two changes, both so we have some record of the improvement form each individual improvement, and because this change is more about simplifying the code base (the improvement is negligible) while the other is about performance improvement. The plan, currently, is to merge this PR and then merge the remaining part of #15505 that moves serialization. [1] The reason the improvement wasn't quite as good as with #15505 when we ran the benchmarks is almost certainly because, at the point when we ran the benchmarks, I hadn't updated the code to manually serialize the Properties (instead the code was using Java's default serialization for the Properties object, whereas #15505 manually serialized the Properties). This PR has since been updated to manually serialize the Properties, just like the other maps. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #16053 from kayousterhout/SPARK-17931.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala9
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala2
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala51
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala23
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala36
5 files changed, 26 insertions, 95 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index ee9149ce02..b252539782 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -29,7 +29,8 @@ import org.apache.spark.{SparkConf, SparkEnv, TaskState}
import org.apache.spark.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils
import org.apache.spark.util.Utils
private[spark] class MesosExecutorBackend
@@ -84,14 +85,12 @@ private[spark] class MesosExecutorBackend
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
- val taskId = taskInfo.getTaskId.getValue.toLong
- val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData)
+ val taskDescription = TaskDescription.decode(taskInfo.getData.asReadOnlyByteBuffer())
if (executor == null) {
logError("Received launchTask but executor was null")
} else {
SparkHadoopUtil.get.runAsSparkUser { () =>
- executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber,
- taskInfo.getName, taskData.serializedTask)
+ executor.launchTask(this, taskDescription)
}
}
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 779ffb5229..7e561916a7 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -351,7 +351,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setExecutor(executorInfo)
.setName(task.name)
.addAllResources(cpuResources.asJava)
- .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
+ .setData(ByteString.copyFrom(TaskDescription.encode(task)))
.build()
(taskInfo, finalResources.asJava)
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
deleted file mode 100644
index 8370b61145..0000000000
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.mesos.protobuf.ByteString
-
-import org.apache.spark.internal.Logging
-
-/**
- * Wrapper for serializing the data sent when launching Mesos tasks.
- */
-private[spark] case class MesosTaskLaunchData(
- serializedTask: ByteBuffer,
- attemptNumber: Int) extends Logging {
-
- def toByteString: ByteString = {
- val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
- dataBuffer.putInt(attemptNumber)
- dataBuffer.put(serializedTask)
- dataBuffer.rewind
- logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
- ByteString.copyFrom(dataBuffer)
- }
-}
-
-private[spark] object MesosTaskLaunchData extends Logging {
- def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
- val byteBuffer = byteString.asReadOnlyByteBuffer()
- logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
- val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
- val serializedTask = byteBuffer.slice() // subsequence starting at the current position
- MesosTaskLaunchData(serializedTask, attemptNumber)
- }
-}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 1d7a86f4b0..4ee85b9183 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import java.util.Arrays
import java.util.Collection
import java.util.Collections
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -246,7 +247,16 @@ class MesosFineGrainedSchedulerBackendSuite
mesosOffers.get(2).getHostname,
(minCpu - backend.mesosExecutorCores).toInt
)
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ val taskDesc = new TaskDescription(
+ taskId = 1L,
+ attemptNumber = 0,
+ executorId = "s1",
+ name = "n1",
+ index = 0,
+ addedFiles = mutable.Map.empty[String, Long],
+ addedJars = mutable.Map.empty[String, Long],
+ properties = new Properties(),
+ ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
@@ -345,7 +355,16 @@ class MesosFineGrainedSchedulerBackendSuite
2 // Deducting 1 for executor
)
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ val taskDesc = new TaskDescription(
+ taskId = 1L,
+ attemptNumber = 0,
+ executorId = "s1",
+ name = "n1",
+ index = 0,
+ addedFiles = mutable.Map.empty[String, Long],
+ addedJars = mutable.Map.empty[String, Long],
+ properties = new Properties(),
+ ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
deleted file mode 100644
index 5a81bb335f..0000000000
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-
-class MesosTaskLaunchDataSuite extends SparkFunSuite {
- test("serialize and deserialize data must be same") {
- val serializedTask = ByteBuffer.allocate(40)
- (Range(100, 110).map(serializedTask.putInt(_)))
- serializedTask.rewind
- val attemptNumber = 100
- val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
- serializedTask.rewind
- val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
- assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
- assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
- }
-}