aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-09-11 19:36:11 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-09-11 19:36:11 -0700
commita310de69e05c30fc86a7ea4bab503b7fe59cada9 (patch)
tree6cc8b41648ce81807cbd6e10b4fe7c24ffadc20f
parent58c7d8b13875536f1b091a8fdc462ab52d075e36 (diff)
parent93c425327560e386aa23b42387cea24eae228e79 (diff)
downloadspark-a310de69e05c30fc86a7ea4bab503b7fe59cada9.tar.gz
spark-a310de69e05c30fc86a7ea4bab503b7fe59cada9.tar.bz2
spark-a310de69e05c30fc86a7ea4bab503b7fe59cada9.zip
Merge pull request #926 from kayousterhout/dynamic
Changed localProperties to use ThreadLocal (not DynamicVariable).
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 29407bcd30..72540c712a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -27,7 +27,6 @@ import scala.collection.generic.Growable
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import scala.util.DynamicVariable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -257,20 +256,20 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new DynamicVariable[Properties](null)
+ private val localProperties = new ThreadLocal[Properties]
def initLocalProperties() {
- localProperties.value = new Properties()
+ localProperties.set(new Properties())
}
def setLocalProperty(key: String, value: String) {
- if (localProperties.value == null) {
- localProperties.value = new Properties()
+ if (localProperties.get() == null) {
+ localProperties.set(new Properties())
}
if (value == null) {
- localProperties.value.remove(key)
+ localProperties.get.remove(key)
} else {
- localProperties.value.setProperty(key, value)
+ localProperties.get.setProperty(key, value)
}
}
@@ -724,7 +723,7 @@ class SparkContext(
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
- localProperties.value)
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
result
@@ -807,7 +806,8 @@ class SparkContext(
val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
+ val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
+ localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}