aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala69
2 files changed, 85 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 742500d87d..f94baaa30d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -18,9 +18,10 @@
package org.apache.spark.executor
import java.nio.ByteBuffer
+import java.util.Properties
import java.util.concurrent.CountDownLatch
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
import org.mockito.Matchers._
import org.mockito.Mockito.{mock, when}
@@ -32,7 +33,7 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.{FakeTask, Task}
+import org.apache.spark.scheduler.{FakeTask, TaskDescription}
import org.apache.spark.serializer.JavaSerializer
class ExecutorSuite extends SparkFunSuite {
@@ -52,13 +53,18 @@ class ExecutorSuite extends SparkFunSuite {
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
when(mockEnv.closureSerializer).thenReturn(serializer)
val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array()
-
- val serializedTask =
- Task.serializeWithDependencies(
- new FakeTask(0, 0, Nil, fakeTaskMetrics),
- HashMap[String, Long](),
- HashMap[String, Long](),
- serializer.newInstance())
+ val serializedTask = serializer.newInstance().serialize(
+ new FakeTask(0, 0, Nil, fakeTaskMetrics))
+ val taskDescription = new TaskDescription(
+ taskId = 0,
+ attemptNumber = 0,
+ executorId = "",
+ name = "",
+ index = 0,
+ addedFiles = Map[String, Long](),
+ addedJars = Map[String, Long](),
+ properties = new Properties,
+ serializedTask)
// we use latches to force the program to run in this order:
// +-----------------------------+---------------------------------------+
@@ -108,7 +114,7 @@ class ExecutorSuite extends SparkFunSuite {
try {
executor = new Executor("id", "localhost", mockEnv, userClassPath = Nil, isLocal = true)
// the task will be launched in a dedicated worker thread
- executor.launchTask(mockExecutorBackend, 0, 0, "", serializedTask)
+ executor.launchTask(mockExecutorBackend, taskDescription)
executorSuiteHelper.latch1.await()
// we know the task will be started, but not yet deserialized, because of the latches we
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
new file mode 100644
index 0000000000..9f1fe05157
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkFunSuite
+
+class TaskDescriptionSuite extends SparkFunSuite {
+ test("encoding and then decoding a TaskDescription results in the same TaskDescription") {
+ val originalFiles = new HashMap[String, Long]()
+ originalFiles.put("fileUrl1", 1824)
+ originalFiles.put("fileUrl2", 2)
+
+ val originalJars = new HashMap[String, Long]()
+ originalJars.put("jar1", 3)
+
+ val originalProperties = new Properties()
+ originalProperties.put("property1", "18")
+ originalProperties.put("property2", "test value")
+
+ // Create a dummy byte buffer for the task.
+ val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
+
+ val originalTaskDescription = new TaskDescription(
+ taskId = 1520589,
+ attemptNumber = 2,
+ executorId = "testExecutor",
+ name = "task for test",
+ index = 19,
+ originalFiles,
+ originalJars,
+ originalProperties,
+ taskBuffer
+ )
+
+ val serializedTaskDescription = TaskDescription.encode(originalTaskDescription)
+ val decodedTaskDescription = TaskDescription.decode(serializedTaskDescription)
+
+ // Make sure that all of the fields in the decoded task description match the original.
+ assert(decodedTaskDescription.taskId === originalTaskDescription.taskId)
+ assert(decodedTaskDescription.attemptNumber === originalTaskDescription.attemptNumber)
+ assert(decodedTaskDescription.executorId === originalTaskDescription.executorId)
+ assert(decodedTaskDescription.name === originalTaskDescription.name)
+ assert(decodedTaskDescription.index === originalTaskDescription.index)
+ assert(decodedTaskDescription.addedFiles.equals(originalFiles))
+ assert(decodedTaskDescription.addedJars.equals(originalJars))
+ assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties))
+ assert(decodedTaskDescription.serializedTask.equals(taskBuffer))
+ }
+}