aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-11-09 00:32:14 -0800
committerReynold Xin <rxin@apache.org>2013-11-09 00:32:14 -0800
commit319299941dbf4bfa2aaa8b5078e313ca45cb5207 (patch)
tree6859a84c7455e741a3455467fef2945ecee02ea0 /repl
parent3d4ad84b63e440fd3f4b3edb1b120ff7c14a42d1 (diff)
downloadspark-319299941dbf4bfa2aaa8b5078e313ca45cb5207.tar.gz
spark-319299941dbf4bfa2aaa8b5078e313ca45cb5207.tar.bz2
spark-319299941dbf4bfa2aaa8b5078e313ca45cb5207.zip
Propagate the SparkContext local property from the thread that calls the spark-repl to the actual execution thread.
Diffstat (limited to 'repl')
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala11
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala35
2 files changed, 42 insertions, 4 deletions
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index e6e35c9b5d..870e12de34 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -878,14 +878,21 @@ class SparkIMain(val settings: Settings, protected val out: PrintWriter) extends
(message, false)
}
}
+
+ // Get a copy of the local properties from SparkContext, and set it later in the thread
+ // that triggers the execution. This is to make sure the caller of this function can pass
+ // the right thread local (inheritable) properties down into Spark.
+ val sc = org.apache.spark.repl.Main.interp.sparkContext
+ val props = if (sc != null) sc.getLocalProperties() else null
try {
val execution = lineManager.set(originalLine) {
// MATEI: set the right SparkEnv for our SparkContext, because
// this execution will happen in a separate thread
- val sc = org.apache.spark.repl.Main.interp.sparkContext
- if (sc != null && sc.env != null)
+ if (sc != null && sc.env != null) {
SparkEnv.set(sc.env)
+ sc.setLocalProperties(props)
+ }
// Execute the line
lineRep call "$export"
}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 8f9b632c0e..6e4504d4d5 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -21,12 +21,14 @@ import java.io._
import java.net.URLClassLoader
import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
-import org.scalatest.FunSuite
import com.google.common.io.Files
+import org.scalatest.FunSuite
+import org.apache.spark.SparkContext
+
class ReplSuite extends FunSuite {
+
def runInterpreter(master: String, input: String): String = {
val in = new BufferedReader(new StringReader(input + "\n"))
val out = new StringWriter()
@@ -64,6 +66,35 @@ class ReplSuite extends FunSuite {
"Interpreter output contained '" + message + "':\n" + output)
}
+ test("propagation of local properties") {
+ // A mock ILoop that doesn't install the SIGINT handler.
+ class ILoop(out: PrintWriter) extends SparkILoop(None, out, None) {
+ settings = new scala.tools.nsc.Settings
+ settings.usejavacp.value = true
+ org.apache.spark.repl.Main.interp = this
+ override def createInterpreter() {
+ intp = new SparkILoopInterpreter
+ intp.setContextClassLoader()
+ }
+ }
+
+ val out = new StringWriter()
+ val interp = new ILoop(new PrintWriter(out))
+ interp.sparkContext = new SparkContext("local", "repl-test")
+ interp.createInterpreter()
+ interp.intp.initialize()
+ interp.sparkContext.setLocalProperty("someKey", "someValue")
+
+ // Make sure the value we set in the caller to interpret is propagated in the thread that
+ // interprets the command.
+ interp.interpret("org.apache.spark.repl.Main.interp.sparkContext.getLocalProperty(\"someKey\")")
+ assert(out.toString.contains("someValue"))
+
+ interp.sparkContext.stop()
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ }
+
test ("simple foreach with accumulator") {
val output = runInterpreter("local", """
val accum = sc.accumulator(0)