aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcin Tustin <marcin.tustin@gmail.com>2016-05-02 19:37:57 -0700
committerReynold Xin <rxin@databricks.com>2016-05-02 19:37:57 -0700
commit8028f3a0b4003af15ed44d9ef4727b56f4b10534 (patch)
tree3a5b789bb091db723d73c97b9afef3164d7e3404 /core
parent4e3685ae5e5826e63bfcd7c3729e3b9cbab484b5 (diff)
downloadspark-8028f3a0b4003af15ed44d9ef4727b56f4b10534.tar.gz
spark-8028f3a0b4003af15ed44d9ef4727b56f4b10534.tar.bz2
spark-8028f3a0b4003af15ed44d9ef4727b56f4b10534.zip
[SPARK-14685][CORE] Document heritability of localProperties
## What changes were proposed in this pull request? This updates the java-/scala- doc for setLocalProperty to document heritability of localProperties. This also adds tests for that behaviour. ## How was this patch tested? Tests pass. New tests were added. Author: Marcin Tustin <marcin.tustin@gmail.com> Closes #12455 from marcintustin/SPARK-14685.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala28
3 files changed, 40 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 302dec25c6..58618b4192 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -608,6 +608,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* scheduler pool. User-defined properties may also be set here. These properties are propagated
* through to worker tasks and can be accessed there via
* [[org.apache.spark.TaskContext#getLocalProperty]].
+ *
+ * These properties are inherited by child threads spawned from this thread. This
+ * may have unexpected consequences when working with thread pools. The standard java
+ * implementation of thread pools have worker threads spawn other worker threads.
+ * As a result, local properties may propagate unpredictably.
*/
def setLocalProperty(key: String, value: String) {
if (value == null) {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index dfd91ae338..fb6323413e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -712,8 +712,13 @@ class JavaSparkContext(val sc: SparkContext)
}
/**
- * Set a local property that affects jobs submitted from this thread, such as the
- * Spark fair scheduler pool.
+ * Set a local property that affects jobs submitted from this thread, and all child
+ * threads, such as the Spark fair scheduler pool.
+ *
+ * These properties are inherited by child threads spawned from this thread. This
+ * may have unexpected consequences when working with thread pools. The standard java
+ * implementation of thread pools have worker threads spawn other worker threads.
+ * As a result, local properties may propagate unpredictably.
*/
def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index a759f364fe..63987084ff 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -323,4 +323,32 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
}
}
+
+
+ test("localProperties are inherited by spawned threads.") {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ sc.setLocalProperty("testProperty", "testValue")
+ var result = "unset";
+ val thread = new Thread() { override def run() = {result = sc.getLocalProperty("testProperty")}}
+ thread.start()
+ thread.join()
+ sc.stop()
+ assert(result == "testValue")
+ }
+
+ test("localProperties do not cross-talk between threads.") {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ var result = "unset";
+ val thread1 = new Thread() {
+ override def run() = {sc.setLocalProperty("testProperty", "testValue")}}
+ // testProperty should be unset and thus return null
+ val thread2 = new Thread() {
+ override def run() = {result = sc.getLocalProperty("testProperty")}}
+ thread1.start()
+ thread1.join()
+ thread2.start()
+ thread2.join()
+ sc.stop()
+ assert(result == null)
+ }
}