aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
commit4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (patch)
treee675378fe850f9fbcf2cafea7cae589876f147a8 /core/src/main
parent2ecbe02d5b28ee562d10c1735244b90a08532c9e (diff)
downloadspark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.gz
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.bz2
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.zip
[SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.
This avoids bringing up yet another HTTP server on the driver, and instead reuses the file server already managed by the driver's RpcEnv. As a bonus, the repl now inherits the security features of the network library. There's also a small change to create the directory for storing classes under the root temp dir for the application (instead of directly under java.io.tmpdir). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9923 from vanzin/SPARK-11563.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala22
7 files changed, 75 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 7cf7bc0dc6..77d8ec9bb1 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -71,6 +71,11 @@ private[spark] class HttpFileServer(
serverUri + "/jars/" + file.getName
}
+ def addDirectory(path: String, resourceBase: String): String = {
+ httpServer.addDirectory(path, resourceBase)
+ serverUri + path
+ }
+
def addFileToDir(file: File, dir: File) : String = {
// Check whether the file is a directory. If it is, throw a more meaningful exception.
// If we don't catch this, Guava throws a very confusing error message:
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 8de3a6c04d..faa3ef3d75 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -23,10 +23,9 @@ import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
-
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
-import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler}
+import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
import org.eclipse.jetty.util.thread.QueuedThreadPool
import org.apache.spark.util.Utils
@@ -52,6 +51,11 @@ private[spark] class HttpServer(
private var server: Server = null
private var port: Int = requestedPort
+ private val servlets = {
+ val handler = new ServletContextHandler()
+ handler.setContextPath("/")
+ handler
+ }
def start() {
if (server != null) {
@@ -65,6 +69,14 @@ private[spark] class HttpServer(
}
}
+ def addDirectory(contextPath: String, resourceBase: String): Unit = {
+ val holder = new ServletHolder()
+ holder.setInitParameter("resourceBase", resourceBase)
+ holder.setInitParameter("pathInfoOnly", "true")
+ holder.setServlet(new DefaultServlet())
+ servlets.addServlet(holder, contextPath.stripSuffix("/") + "/*")
+ }
+
/**
* Actually start the HTTP server on the given port.
*
@@ -85,21 +97,17 @@ private[spark] class HttpServer(
val threadPool = new QueuedThreadPool
threadPool.setDaemon(true)
server.setThreadPool(threadPool)
- val resHandler = new ResourceHandler
- resHandler.setResourceBase(resourceBase.getAbsolutePath)
-
- val handlerList = new HandlerList
- handlerList.setHandlers(Array(resHandler, new DefaultHandler))
+ addDirectory("/", resourceBase.getAbsolutePath)
if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
- sh.setHandler(handlerList)
+ sh.setHandler(servlets)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
- server.setHandler(handlerList)
+ server.setHandler(servlets)
}
server.start()
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8a62b71c3f..194ecc0a04 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -457,6 +457,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
+ // If running the REPL, register the repl's output dir with the file server.
+ _conf.getOption("spark.repl.class.outputDir").foreach { path =>
+ val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
+ _conf.set("spark.repl.class.uri", replUri)
+ }
+
_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
_statusTracker = new SparkStatusTracker(this)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 7b68dfe5ad..552b644d13 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -364,9 +364,9 @@ private[spark] class Executor(
val _userClassPathFirst: java.lang.Boolean = userClassPathFirst
val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
- val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
- classOf[ClassLoader], classOf[Boolean])
- constructor.newInstance(conf, classUri, parent, _userClassPathFirst)
+ val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv],
+ classOf[String], classOf[ClassLoader], classOf[Boolean])
+ constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 3d7d281b0d..64a4a8bf7c 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -179,6 +179,24 @@ private[spark] trait RpcEnvFileServer {
*/
def addJar(file: File): String
+ /**
+ * Adds a local directory to be served via this file server.
+ *
+ * @param baseUri Leading URI path (files can be retrieved by appending their relative
+ * path to this base URI). This cannot be "files" nor "jars".
+ * @param path Path to the local directory.
+ * @return URI for the root of the directory in the file server.
+ */
+ def addDirectory(baseUri: String, path: File): String
+
+ /** Validates and normalizes the base URI for directories. */
+ protected def validateDirectoryUri(baseUri: String): String = {
+ val fixedBaseUri = "/" + baseUri.stripPrefix("/").stripSuffix("/")
+ require(fixedBaseUri != "/files" && fixedBaseUri != "/jars",
+ "Directory URI cannot be /files nor /jars.")
+ fixedBaseUri
+ }
+
}
private[spark] case class RpcEnvConfig(
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index 94dbec593c..9d098154f7 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -273,6 +273,11 @@ private[akka] class AkkaFileServer(
getFileServer().addJar(file)
}
+ override def addDirectory(baseUri: String, path: File): String = {
+ val fixedBaseUri = validateDirectoryUri(baseUri)
+ getFileServer().addDirectory(fixedBaseUri, path.getAbsolutePath())
+ }
+
def shutdown(): Unit = {
if (httpFileServer != null) {
httpFileServer.stop()
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index a2768b4252..ecd9697245 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -25,12 +25,22 @@ import org.apache.spark.rpc.RpcEnvFileServer
/**
* StreamManager implementation for serving files from a NettyRpcEnv.
+ *
+ * Three kinds of resources can be registered in this manager, all backed by actual files:
+ *
+ * - "/files": a flat list of files; used as the backend for [[SparkContext.addFile]].
+ * - "/jars": a flat list of files; used as the backend for [[SparkContext.addJar]].
+ * - arbitrary directories; all files under the directory become available through the manager,
+ * respecting the directory's hierarchy.
+ *
+ * Only streaming (openStream) is supported.
*/
private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
extends StreamManager with RpcEnvFileServer {
private val files = new ConcurrentHashMap[String, File]()
private val jars = new ConcurrentHashMap[String, File]()
+ private val dirs = new ConcurrentHashMap[String, File]()
override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
throw new UnsupportedOperationException()
@@ -41,7 +51,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
val file = ftype match {
case "files" => files.get(fname)
case "jars" => jars.get(fname)
- case _ => throw new IllegalArgumentException(s"Invalid file type: $ftype")
+ case other =>
+ val dir = dirs.get(ftype)
+ require(dir != null, s"Invalid stream URI: $ftype not found.")
+ new File(dir, fname)
}
require(file != null && file.isFile(), s"File not found: $streamId")
@@ -60,4 +73,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
}
+ override def addDirectory(baseUri: String, path: File): String = {
+ val fixedBaseUri = validateDirectoryUri(baseUri)
+ require(dirs.putIfAbsent(fixedBaseUri.stripPrefix("/"), path) == null,
+ s"URI '$fixedBaseUri' already registered.")
+ s"${rpcEnv.address.toSparkURL}$fixedBaseUri"
+ }
+
}