diff options
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala | 26 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala | 69 |
2 files changed, 85 insertions, 10 deletions
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 742500d87d..f94baaa30d 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -18,9 +18,10 @@ package org.apache.spark.executor import java.nio.ByteBuffer +import java.util.Properties import java.util.concurrent.CountDownLatch -import scala.collection.mutable.HashMap +import scala.collection.mutable.Map import org.mockito.Matchers._ import org.mockito.Mockito.{mock, when} @@ -32,7 +33,7 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.memory.MemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rpc.RpcEnv -import org.apache.spark.scheduler.{FakeTask, Task} +import org.apache.spark.scheduler.{FakeTask, TaskDescription} import org.apache.spark.serializer.JavaSerializer class ExecutorSuite extends SparkFunSuite { @@ -52,13 +53,18 @@ class ExecutorSuite extends SparkFunSuite { when(mockEnv.memoryManager).thenReturn(mockMemoryManager) when(mockEnv.closureSerializer).thenReturn(serializer) val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array() - - val serializedTask = - Task.serializeWithDependencies( - new FakeTask(0, 0, Nil, fakeTaskMetrics), - HashMap[String, Long](), - HashMap[String, Long](), - serializer.newInstance()) + val serializedTask = serializer.newInstance().serialize( + new FakeTask(0, 0, Nil, fakeTaskMetrics)) + val taskDescription = new TaskDescription( + taskId = 0, + attemptNumber = 0, + executorId = "", + name = "", + index = 0, + addedFiles = Map[String, Long](), + addedJars = Map[String, Long](), + properties = new Properties, + serializedTask) // we use latches to force the program to run in this order: // +-----------------------------+---------------------------------------+ @@ -108,7 +114,7 @@ class ExecutorSuite extends SparkFunSuite { try { executor = new Executor("id", "localhost", mockEnv, userClassPath = Nil, isLocal = true) // the task will be launched in a dedicated worker thread - executor.launchTask(mockExecutorBackend, 0, 0, "", serializedTask) + executor.launchTask(mockExecutorBackend, taskDescription) executorSuiteHelper.latch1.await() // we know the task will be started, but not yet deserialized, because of the latches we diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala new file mode 100644 index 0000000000..9f1fe05157 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.nio.ByteBuffer +import java.util.Properties + +import scala.collection.mutable.HashMap + +import org.apache.spark.SparkFunSuite + +class TaskDescriptionSuite extends SparkFunSuite { + test("encoding and then decoding a TaskDescription results in the same TaskDescription") { + val originalFiles = new HashMap[String, Long]() + originalFiles.put("fileUrl1", 1824) + originalFiles.put("fileUrl2", 2) + + val originalJars = new HashMap[String, Long]() + originalJars.put("jar1", 3) + + val originalProperties = new Properties() + originalProperties.put("property1", "18") + originalProperties.put("property2", "test value") + + // Create a dummy byte buffer for the task. + val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) + + val originalTaskDescription = new TaskDescription( + taskId = 1520589, + attemptNumber = 2, + executorId = "testExecutor", + name = "task for test", + index = 19, + originalFiles, + originalJars, + originalProperties, + taskBuffer + ) + + val serializedTaskDescription = TaskDescription.encode(originalTaskDescription) + val decodedTaskDescription = TaskDescription.decode(serializedTaskDescription) + + // Make sure that all of the fields in the decoded task description match the original. + assert(decodedTaskDescription.taskId === originalTaskDescription.taskId) + assert(decodedTaskDescription.attemptNumber === originalTaskDescription.attemptNumber) + assert(decodedTaskDescription.executorId === originalTaskDescription.executorId) + assert(decodedTaskDescription.name === originalTaskDescription.name) + assert(decodedTaskDescription.index === originalTaskDescription.index) + assert(decodedTaskDescription.addedFiles.equals(originalFiles)) + assert(decodedTaskDescription.addedJars.equals(originalJars)) + assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties)) + assert(decodedTaskDescription.serializedTask.equals(taskBuffer)) + } +} |