aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2017-01-06 10:48:00 -0600
committerImran Rashid <irashid@cloudera.com>2017-01-06 10:48:08 -0600
commit2e139eed3194c7b8814ff6cf007d4e8a874c1e4d (patch)
treee175b38ba8df154564e74eb32fe44ee1ae783ea5 /core
parent4a4c3dc9ca10e52f7981b225ec44e97247986905 (diff)
downloadspark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.tar.gz
spark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.tar.bz2
spark-2e139eed3194c7b8814ff6cf007d4e8a874c1e4d.zip
[SPARK-17931] Eliminate unnecessary task (de) serialization
In the existing code, there are three layers of serialization involved in sending a task from the scheduler to an executor: - A Task object is serialized - The Task object is copied to a byte buffer that also contains serialized information about any additional JARs, files, and Properties needed for the task to execute. This byte buffer is stored as the member variable serializedTask in the TaskDescription class. - The TaskDescription is serialized (in addition to the serialized task + JARs, the TaskDescription class contains the task ID and other metadata) and sent in a LaunchTask message. While it *is* necessary to have two layers of serialization, so that the JAR, file, and Property info can be deserialized prior to deserializing the Task object, the third layer of deserialization is unnecessary. This commit eliminates a layer of serialization by moving the JARs, files, and Properties into the TaskDescription class. This commit also serializes the Properties manually (by traversing the map), as is done with the JARs and files, which reduces the final serialized size. Unit tests This is a simpler alternative to the approach proposed in #15505. shivaram and I did some benchmarking of this and #15505 on a 20-machine m2.4xlarge EC2 machines (160 cores). We ran ~30 trials of code [1] (a very simple job with 10K tasks per stage) and measured the average time per stage: Before this change: 2490ms With this change: 2345 ms (so ~6% improvement over the baseline) With witgo's approach in #15505: 2046 ms (~18% improvement over baseline) The reason that #15505 has a more significant improvement is that it also moves the serialization from the TaskSchedulerImpl thread to the CoarseGrainedSchedulerBackend thread. I added that functionality on top of this change, and got almost the same improvement [1] as #15505 (average of 2103ms). I think we should decouple these two changes, both so we have some record of the improvement form each individual improvement, and because this change is more about simplifying the code base (the improvement is negligible) while the other is about performance improvement. The plan, currently, is to merge this PR and then merge the remaining part of #15505 that moves serialization. [1] The reason the improvement wasn't quite as good as with #15505 when we ran the benchmarks is almost certainly because, at the point when we ran the benchmarks, I hadn't updated the code to manually serialize the Properties (instead the code was using Java's default serialization for the Properties object, whereas #15505 manually serialized the Properties). This PR has since been updated to manually serialize the Properties, just like the other maps. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #16053 from kayousterhout/SPARK-17931.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala107
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala69
9 files changed, 215 insertions, 142 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 92a27902c6..4a38560d8d 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -92,10 +92,9 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
- val taskDesc = ser.deserialize[TaskDescription](data.value)
+ val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
- taskDesc.name, taskDesc.serializedTask)
+ executor.launchTask(this, taskDesc)
}
case KillTask(taskId, _, interruptThread) =>
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3346f6dd1f..789198f52c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.control.NonFatal
import org.apache.spark._
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rpc.RpcTimeout
-import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTaskResult, Task}
+import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
@@ -148,15 +148,9 @@ private[spark] class Executor(
startDriverHeartbeater()
- def launchTask(
- context: ExecutorBackend,
- taskId: Long,
- attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer): Unit = {
- val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
- serializedTask)
- runningTasks.put(taskId, tr)
+ def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
+ val tr = new TaskRunner(context, taskDescription)
+ runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
@@ -212,13 +206,12 @@ private[spark] class Executor(
class TaskRunner(
execBackend: ExecutorBackend,
- val taskId: Long,
- val attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer)
+ private val taskDescription: TaskDescription)
extends Runnable {
+ val taskId = taskDescription.taskId
val threadName = s"Executor task launch worker for task $taskId"
+ private val taskName = taskDescription.name
/** Whether this task has been killed. */
@volatile private var killed = false
@@ -287,16 +280,14 @@ private[spark] class Executor(
startGCTime = computeTotalGcTime()
try {
- val (taskFiles, taskJars, taskProps, taskBytes) =
- Task.deserializeWithDependencies(serializedTask)
-
// Must be set before updateDependencies() is called, in case fetching dependencies
// requires access to properties contained within (e.g. for access control).
- Executor.taskDeserializationProps.set(taskProps)
+ Executor.taskDeserializationProps.set(taskDescription.properties)
- updateDependencies(taskFiles, taskJars)
- task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
- task.localProperties = taskProps
+ updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
+ task = ser.deserialize[Task[Any]](
+ taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
+ task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
@@ -321,7 +312,7 @@ private[spark] class Executor(
val value = try {
val res = task.run(
taskAttemptId = taskId,
- attemptNumber = attemptNumber,
+ attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
@@ -637,7 +628,7 @@ private[spark] class Executor(
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
- private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
+ private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5becca6c06..51976f666d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -215,89 +215,3 @@ private[spark] abstract class Task[T](
}
}
}
-
-/**
- * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
- * need to send the list of JARs and files added to the SparkContext with each task to ensure that
- * worker nodes find out about it, but we can't make it part of the Task because the user's code in
- * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
- * first writing out its dependencies.
- */
-private[spark] object Task {
- /**
- * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
- */
- def serializeWithDependencies(
- task: Task[_],
- currentFiles: mutable.Map[String, Long],
- currentJars: mutable.Map[String, Long],
- serializer: SerializerInstance)
- : ByteBuffer = {
-
- val out = new ByteBufferOutputStream(4096)
- val dataOut = new DataOutputStream(out)
-
- // Write currentFiles
- dataOut.writeInt(currentFiles.size)
- for ((name, timestamp) <- currentFiles) {
- dataOut.writeUTF(name)
- dataOut.writeLong(timestamp)
- }
-
- // Write currentJars
- dataOut.writeInt(currentJars.size)
- for ((name, timestamp) <- currentJars) {
- dataOut.writeUTF(name)
- dataOut.writeLong(timestamp)
- }
-
- // Write the task properties separately so it is available before full task deserialization.
- val propBytes = Utils.serialize(task.localProperties)
- dataOut.writeInt(propBytes.length)
- dataOut.write(propBytes)
-
- // Write the task itself and finish
- dataOut.flush()
- val taskBytes = serializer.serialize(task)
- Utils.writeByteBuffer(taskBytes, out)
- out.close()
- out.toByteBuffer
- }
-
- /**
- * Deserialize the list of dependencies in a task serialized with serializeWithDependencies,
- * and return the task itself as a serialized ByteBuffer. The caller can then update its
- * ClassLoaders and deserialize the task.
- *
- * @return (taskFiles, taskJars, taskProps, taskBytes)
- */
- def deserializeWithDependencies(serializedTask: ByteBuffer)
- : (HashMap[String, Long], HashMap[String, Long], Properties, ByteBuffer) = {
-
- val in = new ByteBufferInputStream(serializedTask)
- val dataIn = new DataInputStream(in)
-
- // Read task's files
- val taskFiles = new HashMap[String, Long]()
- val numFiles = dataIn.readInt()
- for (i <- 0 until numFiles) {
- taskFiles(dataIn.readUTF()) = dataIn.readLong()
- }
-
- // Read task's JARs
- val taskJars = new HashMap[String, Long]()
- val numJars = dataIn.readInt()
- for (i <- 0 until numJars) {
- taskJars(dataIn.readUTF()) = dataIn.readLong()
- }
-
- val propLength = dataIn.readInt()
- val propBytes = new Array[Byte](propLength)
- dataIn.readFully(propBytes, 0, propLength)
- val taskProps = Utils.deserialize[Properties](propBytes)
-
- // Create a sub-buffer for the rest of the data, which is the serialized Task object
- val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task
- (taskFiles, taskJars, taskProps, subBuffer)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 45c742cbff..78aa5c4001 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -17,13 +17,31 @@
package org.apache.spark.scheduler
+import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
+import java.util.Properties
-import org.apache.spark.util.SerializableBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, Map}
+
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
/**
* Description of a task that gets passed onto executors to be executed, usually created by
* `TaskSetManager.resourceOffer`.
+ *
+ * TaskDescriptions and the associated Task need to be serialized carefully for two reasons:
+ *
+ * (1) When a TaskDescription is received by an Executor, the Executor needs to first get the
+ * list of JARs and files and add these to the classpath, and set the properties, before
+ * deserializing the Task object (serializedTask). This is why the Properties are included
+ * in the TaskDescription, even though they're also in the serialized task.
+ * (2) Because a TaskDescription is serialized and sent to an executor for each task, efficient
+ * serialization (both in terms of serialization time and serialized buffer size) is
+ * important. For this reason, we serialize TaskDescriptions ourselves with the
+ * TaskDescription.encode and TaskDescription.decode methods. This results in a smaller
+ * serialized size because it avoids serializing unnecessary fields in the Map objects
+ * (which can introduce significant overhead when the maps are small).
*/
private[spark] class TaskDescription(
val taskId: Long,
@@ -31,13 +49,88 @@ private[spark] class TaskDescription(
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
- _serializedTask: ByteBuffer)
- extends Serializable {
+ val addedFiles: Map[String, Long],
+ val addedJars: Map[String, Long],
+ val properties: Properties,
+ val serializedTask: ByteBuffer) {
+
+ override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
+}
- // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
- private val buffer = new SerializableBuffer(_serializedTask)
+private[spark] object TaskDescription {
+ private def serializeStringLongMap(map: Map[String, Long], dataOut: DataOutputStream): Unit = {
+ dataOut.writeInt(map.size)
+ for ((key, value) <- map) {
+ dataOut.writeUTF(key)
+ dataOut.writeLong(value)
+ }
+ }
- def serializedTask: ByteBuffer = buffer.value
+ def encode(taskDescription: TaskDescription): ByteBuffer = {
+ val bytesOut = new ByteBufferOutputStream(4096)
+ val dataOut = new DataOutputStream(bytesOut)
- override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
+ dataOut.writeLong(taskDescription.taskId)
+ dataOut.writeInt(taskDescription.attemptNumber)
+ dataOut.writeUTF(taskDescription.executorId)
+ dataOut.writeUTF(taskDescription.name)
+ dataOut.writeInt(taskDescription.index)
+
+ // Write files.
+ serializeStringLongMap(taskDescription.addedFiles, dataOut)
+
+ // Write jars.
+ serializeStringLongMap(taskDescription.addedJars, dataOut)
+
+ // Write properties.
+ dataOut.writeInt(taskDescription.properties.size())
+ taskDescription.properties.asScala.foreach { case (key, value) =>
+ dataOut.writeUTF(key)
+ dataOut.writeUTF(value)
+ }
+
+ // Write the task. The task is already serialized, so write it directly to the byte buffer.
+ Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)
+
+ dataOut.close()
+ bytesOut.close()
+ bytesOut.toByteBuffer
+ }
+
+ private def deserializeStringLongMap(dataIn: DataInputStream): HashMap[String, Long] = {
+ val map = new HashMap[String, Long]()
+ val mapSize = dataIn.readInt()
+ for (i <- 0 until mapSize) {
+ map(dataIn.readUTF()) = dataIn.readLong()
+ }
+ map
+ }
+
+ def decode(byteBuffer: ByteBuffer): TaskDescription = {
+ val dataIn = new DataInputStream(new ByteBufferInputStream(byteBuffer))
+ val taskId = dataIn.readLong()
+ val attemptNumber = dataIn.readInt()
+ val executorId = dataIn.readUTF()
+ val name = dataIn.readUTF()
+ val index = dataIn.readInt()
+
+ // Read files.
+ val taskFiles = deserializeStringLongMap(dataIn)
+
+ // Read jars.
+ val taskJars = deserializeStringLongMap(dataIn)
+
+ // Read properties.
+ val properties = new Properties()
+ val numProperties = dataIn.readInt()
+ for (i <- 0 until numProperties) {
+ properties.setProperty(dataIn.readUTF(), dataIn.readUTF())
+ }
+
+ // Create a sub-buffer for the serialized task into its own buffer (to be deserialized later).
+ val serializedTask = byteBuffer.slice()
+
+ new TaskDescription(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars,
+ properties, serializedTask)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3756c216f5..c7ff13cebf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -446,9 +446,8 @@ private[spark] class TaskSetManager(
lastLaunchTime = curTime
}
// Serialize and return the task
- val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
- Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+ ser.serialize(task)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
@@ -475,8 +474,16 @@ private[spark] class TaskSetManager(
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
sched.dagScheduler.taskStarted(task, info)
- new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
- taskName, index, serializedTask)
+ new TaskDescription(
+ taskId,
+ attemptNum,
+ execId,
+ taskName,
+ index,
+ sched.sc.addedFiles,
+ sched.sc.addedJars,
+ task.localProperties,
+ serializedTask)
}
} else {
None
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 3452487e72..31575c0ca0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -98,11 +98,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]
- // If this DriverEndpoint is changed to support multiple threads,
- // then this may need to be changed so that we don't share the serializer
- // instance across threads
- private val ser = SparkEnv.get.closureSerializer.newInstance()
-
protected val addressToExecutorId = new HashMap[RpcAddress, String]
private val reviveThread =
@@ -249,7 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- val serializedTask = ser.serialize(task)
+ val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 7a73e8ed8a..625f998cd4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -84,8 +84,7 @@ private[spark] class LocalEndpoint(
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
- executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
- task.name, task.serializedTask)
+ executor.launchTask(executorBackend, task)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 742500d87d..f94baaa30d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -18,9 +18,10 @@
package org.apache.spark.executor
import java.nio.ByteBuffer
+import java.util.Properties
import java.util.concurrent.CountDownLatch
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
import org.mockito.Matchers._
import org.mockito.Mockito.{mock, when}
@@ -32,7 +33,7 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.{FakeTask, Task}
+import org.apache.spark.scheduler.{FakeTask, TaskDescription}
import org.apache.spark.serializer.JavaSerializer
class ExecutorSuite extends SparkFunSuite {
@@ -52,13 +53,18 @@ class ExecutorSuite extends SparkFunSuite {
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
when(mockEnv.closureSerializer).thenReturn(serializer)
val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array()
-
- val serializedTask =
- Task.serializeWithDependencies(
- new FakeTask(0, 0, Nil, fakeTaskMetrics),
- HashMap[String, Long](),
- HashMap[String, Long](),
- serializer.newInstance())
+ val serializedTask = serializer.newInstance().serialize(
+ new FakeTask(0, 0, Nil, fakeTaskMetrics))
+ val taskDescription = new TaskDescription(
+ taskId = 0,
+ attemptNumber = 0,
+ executorId = "",
+ name = "",
+ index = 0,
+ addedFiles = Map[String, Long](),
+ addedJars = Map[String, Long](),
+ properties = new Properties,
+ serializedTask)
// we use latches to force the program to run in this order:
// +-----------------------------+---------------------------------------+
@@ -108,7 +114,7 @@ class ExecutorSuite extends SparkFunSuite {
try {
executor = new Executor("id", "localhost", mockEnv, userClassPath = Nil, isLocal = true)
// the task will be launched in a dedicated worker thread
- executor.launchTask(mockExecutorBackend, 0, 0, "", serializedTask)
+ executor.launchTask(mockExecutorBackend, taskDescription)
executorSuiteHelper.latch1.await()
// we know the task will be started, but not yet deserialized, because of the latches we
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
new file mode 100644
index 0000000000..9f1fe05157
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkFunSuite
+
+class TaskDescriptionSuite extends SparkFunSuite {
+ test("encoding and then decoding a TaskDescription results in the same TaskDescription") {
+ val originalFiles = new HashMap[String, Long]()
+ originalFiles.put("fileUrl1", 1824)
+ originalFiles.put("fileUrl2", 2)
+
+ val originalJars = new HashMap[String, Long]()
+ originalJars.put("jar1", 3)
+
+ val originalProperties = new Properties()
+ originalProperties.put("property1", "18")
+ originalProperties.put("property2", "test value")
+
+ // Create a dummy byte buffer for the task.
+ val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
+
+ val originalTaskDescription = new TaskDescription(
+ taskId = 1520589,
+ attemptNumber = 2,
+ executorId = "testExecutor",
+ name = "task for test",
+ index = 19,
+ originalFiles,
+ originalJars,
+ originalProperties,
+ taskBuffer
+ )
+
+ val serializedTaskDescription = TaskDescription.encode(originalTaskDescription)
+ val decodedTaskDescription = TaskDescription.decode(serializedTaskDescription)
+
+ // Make sure that all of the fields in the decoded task description match the original.
+ assert(decodedTaskDescription.taskId === originalTaskDescription.taskId)
+ assert(decodedTaskDescription.attemptNumber === originalTaskDescription.attemptNumber)
+ assert(decodedTaskDescription.executorId === originalTaskDescription.executorId)
+ assert(decodedTaskDescription.name === originalTaskDescription.name)
+ assert(decodedTaskDescription.index === originalTaskDescription.index)
+ assert(decodedTaskDescription.addedFiles.equals(originalFiles))
+ assert(decodedTaskDescription.addedJars.equals(originalJars))
+ assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties))
+ assert(decodedTaskDescription.serializedTask.equals(taskBuffer))
+ }
+}