diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 5df541e5a5..5ca0c6419d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -17,12 +17,14 @@ package org.apache.spark.scheduler +import java.util.Properties + import org.mockito.Matchers.any import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter import org.apache.spark._ -import org.apache.spark.executor.TaskMetricsSuite +import org.apache.spark.executor.{Executor, TaskMetricsSuite} import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.JvmSource import org.apache.spark.network.util.JavaUtils @@ -59,7 +61,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val closureSerializer = SparkEnv.get.closureSerializer.newInstance() val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) - val task = new ResultTask[String, String](0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0) + val task = new ResultTask[String, String]( + 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties) intercept[RuntimeException] { task.run(0, 0, null) } @@ -80,7 +83,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val closureSerializer = SparkEnv.get.closureSerializer.newInstance() val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) - val task = new ResultTask[String, String](0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0) + val task = new ResultTask[String, String]( + 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties) intercept[RuntimeException] { task.run(0, 0, null) } @@ -171,9 +175,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val initialAccums = InternalAccumulator.createAll() // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. - val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) { + val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]], new Properties) { context = new TaskContextImpl(0, 0, 0L, 0, new TaskMemoryManager(SparkEnv.get.memoryManager, 0L), + new Properties, SparkEnv.get.metricsSystem, initialAccums) context.taskMetrics.registerAccumulator(acc1) @@ -190,6 +195,17 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4) } + test("localProperties are propagated to executors correctly") { + sc = new SparkContext("local", "test") + sc.setLocalProperty("testPropKey", "testPropValue") + val res = sc.parallelize(Array(1), 1).map(i => i).map(i => { + val inTask = TaskContext.get().getLocalProperty("testPropKey") + val inDeser = Executor.taskDeserializationProps.get().getProperty("testPropKey") + s"$inTask,$inDeser" + }).collect() + assert(res === Array("testPropValue,testPropValue")) + } + } private object TaskContextSuite { |