aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-09-11 12:12:20 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2013-09-11 13:01:39 -0700
commit93c425327560e386aa23b42387cea24eae228e79 (patch)
tree7f774fa2de9868097e709223b20d403e5f0b46d9 /core
parent91a59e6b104523112d52d05a90a8fe6bc988161a (diff)
downloadspark-93c425327560e386aa23b42387cea24eae228e79.tar.gz
spark-93c425327560e386aa23b42387cea24eae228e79.tar.bz2
spark-93c425327560e386aa23b42387cea24eae228e79.zip
Changed localProperties to use ThreadLocal (not DynamicVariable).
The fact that DynamicVariable uses an InheritableThreadLocal can cause problems where the properties end up being shared across threads in certain circumstances.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 29407bcd30..72540c712a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -27,7 +27,6 @@ import scala.collection.generic.Growable
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import scala.util.DynamicVariable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -257,20 +256,20 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new DynamicVariable[Properties](null)
+ private val localProperties = new ThreadLocal[Properties]
def initLocalProperties() {
- localProperties.value = new Properties()
+ localProperties.set(new Properties())
}
def setLocalProperty(key: String, value: String) {
- if (localProperties.value == null) {
- localProperties.value = new Properties()
+ if (localProperties.get() == null) {
+ localProperties.set(new Properties())
}
if (value == null) {
- localProperties.value.remove(key)
+ localProperties.get.remove(key)
} else {
- localProperties.value.setProperty(key, value)
+ localProperties.get.setProperty(key, value)
}
}
@@ -724,7 +723,7 @@ class SparkContext(
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
- localProperties.value)
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
@@ -807,7 +806,8 @@ class SparkContext(
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
+ val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}