From e068f21e0108b89dc9ecf84d98f2ba619d6435e2 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 28 Sep 2010 22:32:38 -0700 Subject: More work on HTTP class loading --- Makefile | 2 ++ run | 2 ++ src/scala/spark/Executor.scala | 8 +++--- src/scala/spark/repl/ExecutorClassLoader.scala | 40 ++++++++++++++++++++------ src/scala/spark/repl/SparkInterpreter.scala | 33 +++++++++++++-------- 5 files changed, 61 insertions(+), 24 deletions(-) diff --git a/Makefile b/Makefile index 9c9ebb6a82..1a3e97a7be 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,8 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar JARS += third_party/scalatest-1.2/scalatest-1.2.jar JARS += third_party/scalacheck_2.8.0-1.7.jar +JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar +JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar CLASSPATH = $(subst $(SPACE),:,$(JARS)) SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala diff --git a/run b/run index 7e044eb021..33eec9fe1b 100755 --- a/run +++ b/run @@ -36,6 +36,8 @@ SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar +SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar +SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do SPARK_CLASSPATH+=:$jar done diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala index 68f4cb8aae..58b20b41dc 100644 --- a/src/scala/spark/Executor.scala +++ b/src/scala/spark/Executor.scala @@ -25,10 +25,10 @@ object Executor { // If the REPL is in use, create a ClassLoader that will be able to // read new classes defined by the REPL as the user types code classLoader = this.getClass.getClassLoader - val classDir = System.getProperty("spark.repl.current.classdir") - if (classDir != null) { - println("Using REPL classdir: " + classDir) - classLoader = new repl.ExecutorClassLoader(classDir, classLoader) + val classUri = System.getProperty("spark.repl.class.uri") + if (classUri != null) { + println("Using REPL class URI: " + classUri) + classLoader = new repl.ExecutorClassLoader(classUri, classLoader) } Thread.currentThread.setContextClassLoader(classLoader) diff --git a/src/scala/spark/repl/ExecutorClassLoader.scala b/src/scala/spark/repl/ExecutorClassLoader.scala index 7d91b20e79..0833b319f7 100644 --- a/src/scala/spark/repl/ExecutorClassLoader.scala +++ b/src/scala/spark/repl/ExecutorClassLoader.scala @@ -1,7 +1,7 @@ package spark.repl import java.io.{ByteArrayOutputStream, InputStream} -import java.net.{URI, URL, URLClassLoader} +import java.net.{URI, URL, URLClassLoader, URLEncoder} import java.util.concurrent.{Executors, ExecutorService} import org.apache.hadoop.conf.Configuration @@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor import org.objectweb.asm.Opcodes._ -// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load -// classes defined by the interpreter when the REPL is in use -class ExecutorClassLoader(classDir: String, parent: ClassLoader) +/** + * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI, + * used to load classes defined by the interpreter when the REPL is used + */ +class ExecutorClassLoader(classUri: String, parent: ClassLoader) extends ClassLoader(parent) { - val fileSystem = FileSystem.get(new URI(classDir), new Configuration()) - val directory = new URI(classDir).getPath + val uri = new URI(classUri) + val directory = uri.getPath + + // Hadoop FileSystem object for our URI, if it isn't using HTTP + var fileSystem: FileSystem = { + if (uri.getScheme() == "http") + null + else + FileSystem.get(uri, new Configuration()) + } override def findClass(name: String): Class[_] = { try { //println("repl.ExecutorClassLoader resolving " + name) - val path = new Path(directory, name.replace('.', '/') + ".class") - val bytes = readAndTransformClass(name, fileSystem.open(path)) + val pathInDirectory = name.replace('.', '/') + ".class" + val inputStream = { + if (fileSystem != null) + fileSystem.open(new Path(directory, pathInDirectory)) + else + new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() + } + val bytes = readAndTransformClass(name, inputStream) + inputStream.close() return defineClass(name, bytes, 0, bytes.length) } catch { case e: Exception => throw new ClassNotFoundException(name, e) @@ -57,6 +74,13 @@ extends ClassLoader(parent) { return bos.toByteArray } } + + /** + * URL-encode a string, preserving only slashes + */ + def urlEncode(str: String): String = { + str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/") + } } class ConstructorCleaner(className: String, cv: ClassVisitor) diff --git a/src/scala/spark/repl/SparkInterpreter.scala b/src/scala/spark/repl/SparkInterpreter.scala index 2bfaaf7342..29a4420950 100644 --- a/src/scala/spark/repl/SparkInterpreter.scala +++ b/src/scala/spark/repl/SparkInterpreter.scala @@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") - /** directory to save .class files to */ - //val virtualDirectory = new VirtualDirectory("(memory)", None) - val virtualDirectory = { + val outputDir = { val rootDir = new File(System.getProperty("spark.repl.classdir", System.getProperty("java.io.tmpdir"))) var attempts = 0 val maxAttempts = 10 - var outputDir: File = null - while (outputDir == null) { + var dir: File = null + while (dir == null) { attempts += 1 if (attempts > maxAttempts) { throw new IOException("Failed to create a temp directory " + "after " + maxAttempts + " attempts!") } try { - outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString) - if (outputDir.exists() || !outputDir.mkdirs()) - outputDir = null + dir = new File(rootDir, "spark-" + UUID.randomUUID.toString) + if (dir.exists() || !dir.mkdirs()) + dir = null } catch { case e: IOException => ; } } - System.setProperty("spark.repl.current.classdir", - "file://" + outputDir.getAbsolutePath + "/") if (SPARK_DEBUG_REPL) - println("Output directory: " + outputDir) - new PlainFile(outputDir) + println("Output directory: " + dir) + dir } + + /** directory to save .class files to */ + //val virtualDirectory = new VirtualDirectory("(memory)", None) + val virtualDirectory = new PlainFile(outputDir) + + /** Jetty server that will serve our classes to worker nodes */ + val classServer = new ClassServer(outputDir) + + // Start the classServer and remember its URI in a spark system property */ + classServer.start() + println("ClassServer started, URI = " + classServer.uri) + System.setProperty("spark.repl.class.uri", classServer.uri) /** reporter */ object reporter extends ConsoleReporter(settings, null, out) { @@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) { */ def close() { reporter.flush + classServer.stop() } /** A traverser that finds all mentioned identifiers, i.e. things -- cgit v1.2.3