aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala24
1 files changed, 20 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 5df541e5a5..5ca0c6419d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -17,12 +17,14 @@
package org.apache.spark.scheduler
+import java.util.Properties
+
import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.apache.spark._
-import org.apache.spark.executor.TaskMetricsSuite
+import org.apache.spark.executor.{Executor, TaskMetricsSuite}
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.network.util.JavaUtils
@@ -59,7 +61,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
- val task = new ResultTask[String, String](0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0)
+ val task = new ResultTask[String, String](
+ 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties)
intercept[RuntimeException] {
task.run(0, 0, null)
}
@@ -80,7 +83,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
- val task = new ResultTask[String, String](0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0)
+ val task = new ResultTask[String, String](
+ 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties)
intercept[RuntimeException] {
task.run(0, 0, null)
}
@@ -171,9 +175,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val initialAccums = InternalAccumulator.createAll()
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
- val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]]) {
+ val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]], new Properties) {
context = new TaskContextImpl(0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
+ new Properties,
SparkEnv.get.metricsSystem,
initialAccums)
context.taskMetrics.registerAccumulator(acc1)
@@ -190,6 +195,17 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
}
+ test("localProperties are propagated to executors correctly") {
+ sc = new SparkContext("local", "test")
+ sc.setLocalProperty("testPropKey", "testPropValue")
+ val res = sc.parallelize(Array(1), 1).map(i => i).map(i => {
+ val inTask = TaskContext.get().getLocalProperty("testPropKey")
+ val inDeser = Executor.taskDeserializationProps.get().getProperty("testPropKey")
+ s"$inTask,$inDeser"
+ }).collect()
+ assert(res === Array("testPropValue,testPropValue"))
+ }
+
}
private object TaskContextSuite {