aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-09-21 23:04:42 -0700
committerReynold Xin <reynoldx@gmail.com>2013-09-21 23:04:42 -0700
commita2ea069a5f2ed83268109deade456dc0fc9b79ee (patch)
tree1a6f428681ba1b20d86045f3c831e98eb6f4a9fd
parentf06f2da2cb2fd6bd25b57e5f5fd6f8e5d37ab1a3 (diff)
parentaa0c29f74779bc5af70250c7481dbd7052ee39cf (diff)
downloadspark-a2ea069a5f2ed83268109deade456dc0fc9b79ee.tar.gz
spark-a2ea069a5f2ed83268109deade456dc0fc9b79ee.tar.bz2
spark-a2ea069a5f2ed83268109deade456dc0fc9b79ee.zip
Merge pull request #937 from jerryshao/localProperties-fix
Fix PR926 local properties issues in Spark Streaming like scenarios
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ThreadingSuite.scala45
2 files changed, 50 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 72540c712a..6bab1f31d0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -256,7 +256,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new ThreadLocal[Properties]
+ private val localProperties = new InheritableThreadLocal[Properties] {
+ override protected def childValue(parent: Properties): Properties = new Properties(parent)
+ }
def initLocalProperties() {
localProperties.set(new Properties())
@@ -273,6 +275,9 @@ class SparkContext(
}
}
+ def getLocalProperty(key: String): String =
+ Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 69383ddfb8..75d6493e33 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -40,7 +40,7 @@ object ThreadingSuiteState {
}
class ThreadingSuite extends FunSuite with LocalSparkContext {
-
+
test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}
+
+ test("set local properties in different thread") {
+ sc = new SparkContext("local", "test")
+ val sem = new Semaphore(0)
+
+ val threads = (1 to 5).map { i =>
+ new Thread() {
+ override def run() {
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ sem.release()
+ }
+ }
+ }
+
+ threads.foreach(_.start())
+
+ sem.acquire(5)
+ assert(sc.getLocalProperty("test") === null)
+ }
+
+ test("set and get local properties in parent-children thread") {
+ sc = new SparkContext("local", "test")
+ sc.setLocalProperty("test", "parent")
+ val sem = new Semaphore(0)
+
+ val threads = (1 to 5).map { i =>
+ new Thread() {
+ override def run() {
+ assert(sc.getLocalProperty("test") === "parent")
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ sem.release()
+ }
+ }
+ }
+
+ threads.foreach(_.start())
+
+ sem.acquire(5)
+ assert(sc.getLocalProperty("test") === "parent")
+ assert(sc.getLocalProperty("Foo") === null)
+ }
}