diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-09-28 22:43:04 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-09-28 22:43:04 -0700 |
commit | e5e9edeeb3612663b885643cccbc4aaeae8c00be (patch) | |
tree | d8a1915b6e37277eb2c2804db31e48d75164c1d4 /src | |
parent | b749f0e20961ae0517d3aa4f54c4f6391a5737a9 (diff) | |
parent | e068f21e0108b89dc9ecf84d98f2ba619d6435e2 (diff) | |
download | spark-e5e9edeeb3612663b885643cccbc4aaeae8c00be.tar.gz spark-e5e9edeeb3612663b885643cccbc4aaeae8c00be.tar.bz2 spark-e5e9edeeb3612663b885643cccbc4aaeae8c00be.zip |
Merge branch 'http-repl-class-serving'
Diffstat (limited to 'src')
-rw-r--r-- | src/scala/spark/Executor.scala | 8 | ||||
-rw-r--r-- | src/scala/spark/repl/ClassServer.scala | 74 | ||||
-rw-r--r-- | src/scala/spark/repl/ExecutorClassLoader.scala | 40 | ||||
-rw-r--r-- | src/scala/spark/repl/SparkInterpreter.scala | 33 |
4 files changed, 131 insertions, 24 deletions
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala index 68f4cb8aae..58b20b41dc 100644 --- a/src/scala/spark/Executor.scala +++ b/src/scala/spark/Executor.scala @@ -25,10 +25,10 @@ object Executor { // If the REPL is in use, create a ClassLoader that will be able to // read new classes defined by the REPL as the user types code classLoader = this.getClass.getClassLoader - val classDir = System.getProperty("spark.repl.current.classdir") - if (classDir != null) { - println("Using REPL classdir: " + classDir) - classLoader = new repl.ExecutorClassLoader(classDir, classLoader) + val classUri = System.getProperty("spark.repl.class.uri") + if (classUri != null) { + println("Using REPL class URI: " + classUri) + classLoader = new repl.ExecutorClassLoader(classUri, classLoader) } Thread.currentThread.setContextClassLoader(classLoader) diff --git a/src/scala/spark/repl/ClassServer.scala b/src/scala/spark/repl/ClassServer.scala new file mode 100644 index 0000000000..14ab2fe2a3 --- /dev/null +++ b/src/scala/spark/repl/ClassServer.scala @@ -0,0 +1,74 @@ +package spark.repl + +import java.io.File +import java.net.InetAddress + +import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.handler.DefaultHandler +import org.eclipse.jetty.server.handler.HandlerList +import org.eclipse.jetty.server.handler.ResourceHandler + + +/** + * Exception type thrown by ClassServer when it is in the wrong state + * for an operation. + */ +class ServerStateException(message: String) extends Exception(message) + + +/** + * An HTTP server used by the interpreter to allow worker nodes to access + * class files created as the user types in lines of code. This is just a + * wrapper around a Jetty embedded HTTP server. + */ +class ClassServer(classDir: File) { + private var server: Server = null + private var port: Int = -1 + + def start() { + if (server != null) { + throw new ServerStateException("Server is already started") + } else { + server = new Server(0) + val resHandler = new ResourceHandler + resHandler.setResourceBase(classDir.getAbsolutePath) + val handlerList = new HandlerList + handlerList.setHandlers(Array(resHandler, new DefaultHandler)) + server.setHandler(handlerList) + server.start() + port = server.getConnectors()(0).getLocalPort() + } + } + + def stop() { + if (server == null) { + throw new ServerStateException("Server is already stopped") + } else { + server.stop() + port = -1 + server = null + } + } + + /** + * Get the URI of this HTTP server (http://host:port) + */ + def uri: String = { + if (server == null) { + throw new ServerStateException("Server is not started") + } else { + return "http://" + getLocalIpAddress + ":" + port + } + } + + /** + * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4) + */ + private def getLocalIpAddress: String = { + // Get local IP as an array of four bytes + val bytes = InetAddress.getLocalHost().getAddress() + // Convert the bytes to ints (keeping in mind that they may be negative) + // and join them into a string + return bytes.map(b => (b.toInt + 256) % 256).mkString(".") + } +} diff --git a/src/scala/spark/repl/ExecutorClassLoader.scala b/src/scala/spark/repl/ExecutorClassLoader.scala index 7d91b20e79..0833b319f7 100644 --- a/src/scala/spark/repl/ExecutorClassLoader.scala +++ b/src/scala/spark/repl/ExecutorClassLoader.scala @@ -1,7 +1,7 @@ package spark.repl import java.io.{ByteArrayOutputStream, InputStream} -import java.net.{URI, URL, URLClassLoader} +import java.net.{URI, URL, URLClassLoader, URLEncoder} import java.util.concurrent.{Executors, ExecutorService} import org.apache.hadoop.conf.Configuration @@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor import org.objectweb.asm.Opcodes._ -// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load -// classes defined by the interpreter when the REPL is in use -class ExecutorClassLoader(classDir: String, parent: ClassLoader) +/** + * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI, + * used to load classes defined by the interpreter when the REPL is used + */ +class ExecutorClassLoader(classUri: String, parent: ClassLoader) extends ClassLoader(parent) { - val fileSystem = FileSystem.get(new URI(classDir), new Configuration()) - val directory = new URI(classDir).getPath + val uri = new URI(classUri) + val directory = uri.getPath + + // Hadoop FileSystem object for our URI, if it isn't using HTTP + var fileSystem: FileSystem = { + if (uri.getScheme() == "http") + null + else + FileSystem.get(uri, new Configuration()) + } override def findClass(name: String): Class[_] = { try { //println("repl.ExecutorClassLoader resolving " + name) - val path = new Path(directory, name.replace('.', '/') + ".class") - val bytes = readAndTransformClass(name, fileSystem.open(path)) + val pathInDirectory = name.replace('.', '/') + ".class" + val inputStream = { + if (fileSystem != null) + fileSystem.open(new Path(directory, pathInDirectory)) + else + new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() + } + val bytes = readAndTransformClass(name, inputStream) + inputStream.close() return defineClass(name, bytes, 0, bytes.length) } catch { case e: Exception => throw new ClassNotFoundException(name, e) @@ -57,6 +74,13 @@ extends ClassLoader(parent) { return bos.toByteArray } } + + /** + * URL-encode a string, preserving only slashes + */ + def urlEncode(str: String): String = { + str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/") + } } class ConstructorCleaner(className: String, cv: ClassVisitor) diff --git a/src/scala/spark/repl/SparkInterpreter.scala b/src/scala/spark/repl/SparkInterpreter.scala index 2bfaaf7342..29a4420950 100644 --- a/src/scala/spark/repl/SparkInterpreter.scala +++ b/src/scala/spark/repl/SparkInterpreter.scala @@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") - /** directory to save .class files to */ - //val virtualDirectory = new VirtualDirectory("(memory)", None) - val virtualDirectory = { + val outputDir = { val rootDir = new File(System.getProperty("spark.repl.classdir", System.getProperty("java.io.tmpdir"))) var attempts = 0 val maxAttempts = 10 - var outputDir: File = null - while (outputDir == null) { + var dir: File = null + while (dir == null) { attempts += 1 if (attempts > maxAttempts) { throw new IOException("Failed to create a temp directory " + "after " + maxAttempts + " attempts!") } try { - outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString) - if (outputDir.exists() || !outputDir.mkdirs()) - outputDir = null + dir = new File(rootDir, "spark-" + UUID.randomUUID.toString) + if (dir.exists() || !dir.mkdirs()) + dir = null } catch { case e: IOException => ; } } - System.setProperty("spark.repl.current.classdir", - "file://" + outputDir.getAbsolutePath + "/") if (SPARK_DEBUG_REPL) - println("Output directory: " + outputDir) - new PlainFile(outputDir) + println("Output directory: " + dir) + dir } + + /** directory to save .class files to */ + //val virtualDirectory = new VirtualDirectory("(memory)", None) + val virtualDirectory = new PlainFile(outputDir) + + /** Jetty server that will serve our classes to worker nodes */ + val classServer = new ClassServer(outputDir) + + // Start the classServer and remember its URI in a spark system property */ + classServer.start() + println("ClassServer started, URI = " + classServer.uri) + System.setProperty("spark.repl.class.uri", classServer.uri) /** reporter */ object reporter extends ConsoleReporter(settings, null, out) { @@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { */ def close() { reporter.flush + classServer.stop() } /** A traverser that finds all mentioned identifiers, i.e. things |