aboutsummaryrefslogtreecommitdiff
path: root/repl/scala-2.10/src
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
commit4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (patch)
treee675378fe850f9fbcf2cafea7cae589876f147a8 /repl/scala-2.10/src
parent2ecbe02d5b28ee562d10c1735244b90a08532c9e (diff)
downloadspark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.gz
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.bz2
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.zip
[SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.
This avoids bringing up yet another HTTP server on the driver, and instead reuses the file server already managed by the driver's RpcEnv. As a bonus, the repl now inherits the security features of the network library. There's also a small change to create the directory for storing classes under the root temp dir for the application (instead of directly under java.io.tmpdir). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9923 from vanzin/SPARK-11563.
Diffstat (limited to 'repl/scala-2.10/src')
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala17
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala28
2 files changed, 15 insertions, 30 deletions
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 304b1e8cdb..22749c4609 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -253,7 +253,7 @@ class SparkILoop(
case xs => xs find (_.name == cmd)
}
}
- private var fallbackMode = false
+ private var fallbackMode = false
private def toggleFallbackMode() {
val old = fallbackMode
@@ -261,9 +261,9 @@ class SparkILoop(
System.setProperty("spark.repl.fallback", fallbackMode.toString)
echo(s"""
|Switched ${if (old) "off" else "on"} fallback mode without restarting.
- | If you have defined classes in the repl, it would
+ | If you have defined classes in the repl, it would
|be good to redefine them incase you plan to use them. If you still run
- |into issues it would be good to restart the repl and turn on `:fallback`
+ |into issues it would be good to restart the repl and turn on `:fallback`
|mode as first command.
""".stripMargin)
}
@@ -350,7 +350,7 @@ class SparkILoop(
shCommand,
nullary("silent", "disable/enable automatic printing of results", verbosity),
nullary("fallback", """
- |disable/enable advanced repl changes, these fix some issues but may introduce others.
+ |disable/enable advanced repl changes, these fix some issues but may introduce others.
|This mode will be removed once these fixes stablize""".stripMargin, toggleFallbackMode),
cmd("type", "[-v] <expr>", "display the type of an expression without evaluating it", typeCommand),
nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand)
@@ -1009,8 +1009,13 @@ class SparkILoop(
val conf = new SparkConf()
.setMaster(getMaster())
.setJars(jars)
- .set("spark.repl.class.uri", intp.classServerUri)
.setIfMissing("spark.app.name", "Spark shell")
+ // SparkContext will detect this configuration and register it with the RpcEnv's
+ // file server, setting spark.repl.class.uri to the actual URI for executors to
+ // use. This is sort of ugly but since executors are started as part of SparkContext
+ // initialization in certain cases, there's an initialization order issue that prevents
+ // this from being set after SparkContext is instantiated.
+ .set("spark.repl.class.outputDir", intp.outputDir.getAbsolutePath())
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
@@ -1025,7 +1030,7 @@ class SparkILoop(
val loader = Utils.getContextOrSparkClassLoader
try {
sqlContext = loader.loadClass(name).getConstructor(classOf[SparkContext])
- .newInstance(sparkContext).asInstanceOf[SQLContext]
+ .newInstance(sparkContext).asInstanceOf[SQLContext]
logInfo("Created sql context (with Hive support)..")
}
catch {
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 829b12269f..7fcb423575 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -37,7 +37,7 @@ import scala.reflect.{ ClassTag, classTag }
import scala.tools.reflect.StdRuntimeTags._
import scala.util.control.ControlThrowable
-import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.util.Utils
import org.apache.spark.annotation.DeveloperApi
@@ -96,10 +96,9 @@ import org.apache.spark.annotation.DeveloperApi
private val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
/** Local directory to save .class files too */
- private lazy val outputDir = {
- val tmp = System.getProperty("java.io.tmpdir")
- val rootDir = conf.get("spark.repl.classdir", tmp)
- Utils.createTempDir(rootDir)
+ private[repl] val outputDir = {
+ val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
+ Utils.createTempDir(root = rootDir, namePrefix = "repl")
}
if (SPARK_DEBUG_REPL) {
echo("Output directory: " + outputDir)
@@ -114,8 +113,6 @@ import org.apache.spark.annotation.DeveloperApi
private val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
/** Jetty server that will serve our classes to worker nodes */
- private val classServerPort = conf.getInt("spark.replClassServer.port", 0)
- private val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
private var currentSettings: Settings = initialSettings
private var printResults = true // whether to print result lines
private var totalSilence = false // whether to print anything
@@ -124,22 +121,6 @@ import org.apache.spark.annotation.DeveloperApi
private var bindExceptions = true // whether to bind the lastException variable
private var _executionWrapper = "" // code to be wrapped around all lines
-
- // Start the classServer and store its URI in a spark system property
- // (which will be passed to executors so that they can connect to it)
- classServer.start()
- if (SPARK_DEBUG_REPL) {
- echo("Class server started, URI = " + classServer.uri)
- }
-
- /**
- * URI of the class server used to feed REPL compiled classes.
- *
- * @return The string representing the class server uri
- */
- @DeveloperApi
- def classServerUri = classServer.uri
-
/** We're going to go to some trouble to initialize the compiler asynchronously.
* It's critical that nothing call into it until it's been initialized or we will
* run into unrecoverable issues, but the perceived repl startup time goes
@@ -994,7 +975,6 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
def close() {
reporter.flush()
- classServer.stop()
}
/**