aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-04-11 18:33:54 -0700
committerReynold Xin <rxin@databricks.com>2016-04-11 18:33:54 -0700
commit6f27027d96ada29d8bb1d626f2cc7c856df3d597 (patch)
tree4a6c72e6dea1db7d4cdede7b966a68dd69b30b22 /sql/core/src
parent94de63053ecd709f44213d09bb43a8b2c5a8b4bb (diff)
downloadspark-6f27027d96ada29d8bb1d626f2cc7c856df3d597.tar.gz
spark-6f27027d96ada29d8bb1d626f2cc7c856df3d597.tar.bz2
spark-6f27027d96ada29d8bb1d626f2cc7c856df3d597.zip
[SPARK-14475] Propagate user-defined context from driver to executors
## What changes were proposed in this pull request? This adds a new API call `TaskContext.getLocalProperty` for getting properties set in the driver from executors. These local properties are automatically propagated from the driver to executors. For streaming, the context for streaming tasks will be the initial driver context when ssc.start() is called. ## How was this patch tested? Unit tests. cc JoshRosen Author: Eric Liang <ekl@databricks.com> Closes #12248 from ericl/sc-2813.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala3
3 files changed, 8 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 4dc7d3461c..c1555114e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.util.Properties
+
import scala.collection.mutable
import scala.util.{Random, Try}
import scala.util.control.NonFatal
@@ -71,6 +73,7 @@ class UnsafeFixedWidthAggregationMapSuite
taskAttemptId = Random.nextInt(10000),
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
+ localProperties = new Properties,
metricsSystem = null))
try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 476d93fc2a..03d4be8ee5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.util.Properties
+
import scala.util.Random
import org.apache.spark._
@@ -117,6 +119,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
taskAttemptId = 98456,
attemptNumber = 0,
taskMemoryManager = taskMemMgr,
+ localProperties = new Properties,
metricsSystem = null))
val sorter = new UnsafeKVExternalSorter(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 1f3779373b..7db1f9654b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
+import java.util.Properties
import org.apache.spark._
import org.apache.spark.memory.TaskMemoryManager
@@ -113,7 +114,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
}
val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
val taskContext = new TaskContextImpl(
- 0, 0, 0, 0, taskMemoryManager, null, InternalAccumulator.create(sc))
+ 0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.create(sc))
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
taskContext,