aboutsummaryrefslogtreecommitdiff
path: root/repl/src/main
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-10 13:26:30 -0800
commit4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (patch)
treee675378fe850f9fbcf2cafea7cae589876f147a8 /repl/src/main
parent2ecbe02d5b28ee562d10c1735244b90a08532c9e (diff)
downloadspark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.gz
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.tar.bz2
spark-4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c.zip
[SPARK-11563][CORE][REPL] Use RpcEnv to transfer REPL-generated classes.
This avoids bringing up yet another HTTP server on the driver, and instead reuses the file server already managed by the driver's RpcEnv. As a bonus, the repl now inherits the security features of the network library. There's also a small change to create the directory for storing classes under the root temp dir for the application (instead of directly under java.io.tmpdir). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9923 from vanzin/SPARK-11563.
Diffstat (limited to 'repl/src/main')
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala36
1 files changed, 20 insertions, 16 deletions
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index a8859fcd45..da8f0aa1e3 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -19,6 +19,7 @@ package org.apache.spark.repl
import java.io.{IOException, ByteArrayOutputStream, InputStream}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
+import java.nio.channels.Channels
import scala.util.control.NonFatal
@@ -38,7 +39,11 @@ import org.apache.spark.util.ParentClassLoader
* This class loader delegates getting/finding resources to parent loader,
* which makes sense until REPL never provide resource dynamically.
*/
-class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader,
+class ExecutorClassLoader(
+ conf: SparkConf,
+ env: SparkEnv,
+ classUri: String,
+ parent: ClassLoader,
userClassPathFirst: Boolean) extends ClassLoader with Logging {
val uri = new URI(classUri)
val directory = uri.getPath
@@ -48,13 +53,12 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
// Allows HTTP connect and read timeouts to be controlled for testing / debugging purposes
private[repl] var httpUrlConnectionTimeoutMillis: Int = -1
- // Hadoop FileSystem object for our URI, if it isn't using HTTP
- var fileSystem: FileSystem = {
- if (Set("http", "https", "ftp").contains(uri.getScheme)) {
- null
- } else {
- FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf))
- }
+ private val fetchFn: (String) => InputStream = uri.getScheme() match {
+ case "spark" => getClassFileInputStreamFromSparkRPC
+ case "http" | "https" | "ftp" => getClassFileInputStreamFromHttpServer
+ case _ =>
+ val fileSystem = FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf))
+ getClassFileInputStreamFromFileSystem(fileSystem)
}
override def getResource(name: String): URL = {
@@ -90,6 +94,11 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
}
}
+ private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {
+ val channel = env.rpcEnv.openChannel(s"$classUri/$path")
+ Channels.newInputStream(channel)
+ }
+
private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = {
val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) {
val uri = new URI(classUri + "/" + urlEncode(pathInDirectory))
@@ -126,7 +135,8 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
}
}
- private def getClassFileInputStreamFromFileSystem(pathInDirectory: String): InputStream = {
+ private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
+ pathInDirectory: String): InputStream = {
val path = new Path(directory, pathInDirectory)
if (fileSystem.exists(path)) {
fileSystem.open(path)
@@ -139,13 +149,7 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
val pathInDirectory = name.replace('.', '/') + ".class"
var inputStream: InputStream = null
try {
- inputStream = {
- if (fileSystem != null) {
- getClassFileInputStreamFromFileSystem(pathInDirectory)
- } else {
- getClassFileInputStreamFromHttpServer(pathInDirectory)
- }
- }
+ inputStream = fetchFn(pathInDirectory)
val bytes = readAndTransformClass(name, inputStream)
Some(defineClass(name, bytes, 0, bytes.length))
} catch {