aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-09-11 12:12:20 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2013-09-11 13:01:39 -0700
commit93c425327560e386aa23b42387cea24eae228e79 (patch)
tree7f774fa2de9868097e709223b20d403e5f0b46d9
parent91a59e6b104523112d52d05a90a8fe6bc988161a (diff)
downloadspark-93c425327560e386aa23b42387cea24eae228e79.tar.gz
spark-93c425327560e386aa23b42387cea24eae228e79.tar.bz2
spark-93c425327560e386aa23b42387cea24eae228e79.zip
Changed localProperties to use ThreadLocal (not DynamicVariable).
The fact that DynamicVariable uses an InheritableThreadLocal can cause problems where the properties end up being shared across threads in certain circumstances.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 29407bcd30..72540c712a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -27,7 +27,6 @@ import scala.collection.generic.Growable
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import scala.util.DynamicVariable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -257,20 +256,20 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new DynamicVariable[Properties](null)
+ private val localProperties = new ThreadLocal[Properties]
def initLocalProperties() {
- localProperties.value = new Properties()
+ localProperties.set(new Properties())
}
def setLocalProperty(key: String, value: String) {
- if (localProperties.value == null) {
- localProperties.value = new Properties()
+ if (localProperties.get() == null) {
+ localProperties.set(new Properties())
}
if (value == null) {
- localProperties.value.remove(key)
+ localProperties.get.remove(key)
} else {
- localProperties.value.setProperty(key, value)
+ localProperties.get.setProperty(key, value)
}
}
@@ -724,7 +723,7 @@ class SparkContext(
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
- localProperties.value)
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
@@ -807,7 +806,8 @@ class SparkContext(
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
+ val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}