aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2017-01-06 10:48:00 -0600
committerImran Rashid <irashid@cloudera.com>2017-01-06 10:48:08 -0600
commit2e139eed3194c7b8814ff6cf007d4e8a874c1e4d (patch)
treee175b38ba8df154564e74eb32fe44ee1ae783ea5 /core/src/test/scala
parent4a4c3dc9ca10e52f7981b225ec44e97247986905 (diff)
downloadspark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.tar.gz
spark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.tar.bz2
spark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.zip
[SPARK-17931] Eliminate unnecessary task (de) serialization
In the existing code, there are three layers of serialization involved in sending a task from the scheduler to an executor: - A Task object is serialized - The Task object is copied to a byte buffer that also contains serialized information about any additional JARs, files, and Properties needed for the task to execute. This byte buffer is stored as the member variable serializedTask in the TaskDescription class. - The TaskDescription is serialized (in addition to the serialized task + JARs, the TaskDescription class contains the task ID and other metadata) and sent in a LaunchTask message. While it *is* necessary to have two layers of serialization, so that the JAR, file, and Property info can be deserialized prior to deserializing the Task object, the third layer of deserialization is unnecessary. This commit eliminates a layer of serialization by moving the JARs, files, and Properties into the TaskDescription class. This commit also serializes the Properties manually (by traversing the map), as is done with the JARs and files, which reduces the final serialized size. Unit tests This is a simpler alternative to the approach proposed in #15505. shivaram and I did some benchmarking of this and #15505 on a 20-machine m2.4xlarge EC2 machines (160 cores). We ran ~30 trials of code [1] (a very simple job with 10K tasks per stage) and measured the average time per stage: Before this change: 2490ms With this change: 2345 ms (so ~6% improvement over the baseline) With witgo's approach in #15505: 2046 ms (~18% improvement over baseline) The reason that #15505 has a more significant improvement is that it also moves the serialization from the TaskSchedulerImpl thread to the CoarseGrainedSchedulerBackend thread. I added that functionality on top of this change, and got almost the same improvement [1] as #15505 (average of 2103ms). I think we should decouple these two changes, both so we have some record of the improvement form each individual improvement, and because this change is more about simplifying the code base (the improvement is negligible) while the other is about performance improvement. The plan, currently, is to merge this PR and then merge the remaining part of #15505 that moves serialization. [1] The reason the improvement wasn't quite as good as with #15505 when we ran the benchmarks is almost certainly because, at the point when we ran the benchmarks, I hadn't updated the code to manually serialize the Properties (instead the code was using Java's default serialization for the Properties object, whereas #15505 manually serialized the Properties). This PR has since been updated to manually serialize the Properties, just like the other maps. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #16053 from kayousterhout/SPARK-17931.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala69
2 files changed, 85 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 742500d87d..f94baaa30d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -18,9 +18,10 @@
package org.apache.spark.executor
import java.nio.ByteBuffer
+import java.util.Properties
import java.util.concurrent.CountDownLatch
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
import org.mockito.Matchers._
import org.mockito.Mockito.{mock, when}
@@ -32,7 +33,7 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.{FakeTask, Task}
+import org.apache.spark.scheduler.{FakeTask, TaskDescription}
import org.apache.spark.serializer.JavaSerializer
class ExecutorSuite extends SparkFunSuite {
@@ -52,13 +53,18 @@ class ExecutorSuite extends SparkFunSuite {
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
when(mockEnv.closureSerializer).thenReturn(serializer)
val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array()
-
- val serializedTask =
- Task.serializeWithDependencies(
- new FakeTask(0, 0, Nil, fakeTaskMetrics),
- HashMap[String, Long](),
- HashMap[String, Long](),
- serializer.newInstance())
+ val serializedTask = serializer.newInstance().serialize(
+ new FakeTask(0, 0, Nil, fakeTaskMetrics))
+ val taskDescription = new TaskDescription(
+ taskId = 0,
+ attemptNumber = 0,
+ executorId = "",
+ name = "",
+ index = 0,
+ addedFiles = Map[String, Long](),
+ addedJars = Map[String, Long](),
+ properties = new Properties,
+ serializedTask)
// we use latches to force the program to run in this order:
// +-----------------------------+---------------------------------------+
@@ -108,7 +114,7 @@ class ExecutorSuite extends SparkFunSuite {
try {
executor = new Executor("id", "localhost", mockEnv, userClassPath = Nil, isLocal = true)
// the task will be launched in a dedicated worker thread
- executor.launchTask(mockExecutorBackend, 0, 0, "", serializedTask)
+ executor.launchTask(mockExecutorBackend, taskDescription)
executorSuiteHelper.latch1.await()
// we know the task will be started, but not yet deserialized, because of the latches we
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
new file mode 100644
index 0000000000..9f1fe05157
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkFunSuite
+
+class TaskDescriptionSuite extends SparkFunSuite {
+ test("encoding and then decoding a TaskDescription results in the same TaskDescription") {
+ val originalFiles = new HashMap[String, Long]()
+ originalFiles.put("fileUrl1", 1824)
+ originalFiles.put("fileUrl2", 2)
+
+ val originalJars = new HashMap[String, Long]()
+ originalJars.put("jar1", 3)
+
+ val originalProperties = new Properties()
+ originalProperties.put("property1", "18")
+ originalProperties.put("property2", "test value")
+
+ // Create a dummy byte buffer for the task.
+ val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
+
+ val originalTaskDescription = new TaskDescription(
+ taskId = 1520589,
+ attemptNumber = 2,
+ executorId = "testExecutor",
+ name = "task for test",
+ index = 19,
+ originalFiles,
+ originalJars,
+ originalProperties,
+ taskBuffer
+ )
+
+ val serializedTaskDescription = TaskDescription.encode(originalTaskDescription)
+ val decodedTaskDescription = TaskDescription.decode(serializedTaskDescription)
+
+ // Make sure that all of the fields in the decoded task description match the original.
+ assert(decodedTaskDescription.taskId === originalTaskDescription.taskId)
+ assert(decodedTaskDescription.attemptNumber === originalTaskDescription.attemptNumber)
+ assert(decodedTaskDescription.executorId === originalTaskDescription.executorId)
+ assert(decodedTaskDescription.name === originalTaskDescription.name)
+ assert(decodedTaskDescription.index === originalTaskDescription.index)
+ assert(decodedTaskDescription.addedFiles.equals(originalFiles))
+ assert(decodedTaskDescription.addedJars.equals(originalJars))
+ assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties))
+ assert(decodedTaskDescription.serializedTask.equals(taskBuffer))
+ }
+}