diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2013-09-11 12:12:20 -0700 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2013-09-11 13:01:39 -0700 |
commit | 93c425327560e386aa23b42387cea24eae228e79 (patch) | |
tree | 7f774fa2de9868097e709223b20d403e5f0b46d9 | |
parent | 91a59e6b104523112d52d05a90a8fe6bc988161a (diff) | |
download | spark-93c425327560e386aa23b42387cea24eae228e79.tar.gz spark-93c425327560e386aa23b42387cea24eae228e79.tar.bz2 spark-93c425327560e386aa23b42387cea24eae228e79.zip |
Changed localProperties to use ThreadLocal (not DynamicVariable).
The fact that DynamicVariable uses an InheritableThreadLocal
can cause problems where the properties end up being shared
across threads in certain circumstances.
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 29407bcd30..72540c712a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,7 +27,6 @@ import scala.collection.generic.Growable import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import scala.util.DynamicVariable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -257,20 +256,20 @@ class SparkContext( private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack - private val localProperties = new DynamicVariable[Properties](null) + private val localProperties = new ThreadLocal[Properties] def initLocalProperties() { - localProperties.value = new Properties() + localProperties.set(new Properties()) } def setLocalProperty(key: String, value: String) { - if (localProperties.value == null) { - localProperties.value = new Properties() + if (localProperties.get() == null) { + localProperties.set(new Properties()) } if (value == null) { - localProperties.value.remove(key) + localProperties.get.remove(key) } else { - localProperties.value.setProperty(key, value) + localProperties.get.setProperty(key, value) } } @@ -724,7 +723,7 @@ class SparkContext( logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, - localProperties.value) + localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result @@ -807,7 +806,8 @@ class SparkContext( val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value) + val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, + localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") result } |