diff options
author | Eric Liang <ekl@databricks.com> | 2016-04-11 18:33:54 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-11 18:33:54 -0700 |
commit | 6f27027d96ada29d8bb1d626f2cc7c856df3d597 (patch) | |
tree | 4a6c72e6dea1db7d4cdede7b966a68dd69b30b22 /streaming/src/test | |
parent | 94de63053ecd709f44213d09bb43a8b2c5a8b4bb (diff) | |
download | spark-6f27027d96ada29d8bb1d626f2cc7c856df3d597.tar.gz spark-6f27027d96ada29d8bb1d626f2cc7c856df3d597.tar.bz2 spark-6f27027d96ada29d8bb1d626f2cc7c856df3d597.zip |
[SPARK-14475] Propagate user-defined context from driver to executors
## What changes were proposed in this pull request?
This adds a new API call `TaskContext.getLocalProperty` for getting properties set in the driver from executors. These local properties are automatically propagated from the driver to executors. For streaming, the context for streaming tasks will be the initial driver context when ssc.start() is called.
## How was this patch tested?
Unit tests.
cc JoshRosen
Author: Eric Liang <ekl@databricks.com>
Closes #12248 from ericl/sc-2813.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a80154e2fc..806e181f61 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -182,7 +182,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(ssc.scheduler.isStarted === false) } - test("start should set job group and description of streaming jobs correctly") { + test("start should set local properties of streaming jobs correctly") { ssc = new StreamingContext(conf, batchDuration) ssc.sc.setJobGroup("non-streaming", "non-streaming", true) val sc = ssc.sc @@ -190,16 +190,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo @volatile var jobGroupFound: String = "" @volatile var jobDescFound: String = "" @volatile var jobInterruptFound: String = "" + @volatile var customPropFound: String = "" @volatile var allFound: Boolean = false addInputStream(ssc).foreachRDD { rdd => jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) + customPropFound = sc.getLocalProperty("customPropKey") allFound = true } + ssc.sc.setLocalProperty("customPropKey", "value1") ssc.start() + // Local props set after start should be ignored + ssc.sc.setLocalProperty("customPropKey", "value2") + eventually(timeout(10 seconds), interval(10 milliseconds)) { assert(allFound === true) } @@ -208,11 +214,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(jobGroupFound === null) assert(jobDescFound.contains("Streaming job from")) assert(jobInterruptFound === "false") + assert(customPropFound === "value1") // Verify current thread's thread-local properties have not changed assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming") assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming") assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true") + assert(sc.getLocalProperty("customPropKey") === "value2") } test("start multiple times") { |