aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rwxr-xr-xrun2
-rw-r--r--src/scala/spark/Executor.scala8
-rw-r--r--src/scala/spark/repl/ClassServer.scala74
-rw-r--r--src/scala/spark/repl/ExecutorClassLoader.scala40
-rw-r--r--src/scala/spark/repl/SparkInterpreter.scala33
-rw-r--r--third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jarbin0 -> 647178 bytes
-rw-r--r--third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jarbin0 -> 105112 bytes
8 files changed, 135 insertions, 24 deletions
diff --git a/Makefile b/Makefile
index 9c9ebb6a82..1a3e97a7be 100644
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,8 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
JARS += third_party/scalatest-1.2/scalatest-1.2.jar
JARS += third_party/scalacheck_2.8.0-1.7.jar
+JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
+JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
CLASSPATH = $(subst $(SPACE),:,$(JARS))
SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
diff --git a/run b/run
index 7e044eb021..33eec9fe1b 100755
--- a/run
+++ b/run
@@ -36,6 +36,8 @@ SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar
SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar
+SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
+SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
SPARK_CLASSPATH+=:$jar
done
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala
index 68f4cb8aae..58b20b41dc 100644
--- a/src/scala/spark/Executor.scala
+++ b/src/scala/spark/Executor.scala
@@ -25,10 +25,10 @@ object Executor {
// If the REPL is in use, create a ClassLoader that will be able to
// read new classes defined by the REPL as the user types code
classLoader = this.getClass.getClassLoader
- val classDir = System.getProperty("spark.repl.current.classdir")
- if (classDir != null) {
- println("Using REPL classdir: " + classDir)
- classLoader = new repl.ExecutorClassLoader(classDir, classLoader)
+ val classUri = System.getProperty("spark.repl.class.uri")
+ if (classUri != null) {
+ println("Using REPL class URI: " + classUri)
+ classLoader = new repl.ExecutorClassLoader(classUri, classLoader)
}
Thread.currentThread.setContextClassLoader(classLoader)
diff --git a/src/scala/spark/repl/ClassServer.scala b/src/scala/spark/repl/ClassServer.scala
new file mode 100644
index 0000000000..14ab2fe2a3
--- /dev/null
+++ b/src/scala/spark/repl/ClassServer.scala
@@ -0,0 +1,74 @@
+package spark.repl
+
+import java.io.File
+import java.net.InetAddress
+
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.handler.DefaultHandler
+import org.eclipse.jetty.server.handler.HandlerList
+import org.eclipse.jetty.server.handler.ResourceHandler
+
+
+/**
+ * Exception type thrown by ClassServer when it is in the wrong state
+ * for an operation.
+ */
+class ServerStateException(message: String) extends Exception(message)
+
+
+/**
+ * An HTTP server used by the interpreter to allow worker nodes to access
+ * class files created as the user types in lines of code. This is just a
+ * wrapper around a Jetty embedded HTTP server.
+ */
+class ClassServer(classDir: File) {
+ private var server: Server = null
+ private var port: Int = -1
+
+ def start() {
+ if (server != null) {
+ throw new ServerStateException("Server is already started")
+ } else {
+ server = new Server(0)
+ val resHandler = new ResourceHandler
+ resHandler.setResourceBase(classDir.getAbsolutePath)
+ val handlerList = new HandlerList
+ handlerList.setHandlers(Array(resHandler, new DefaultHandler))
+ server.setHandler(handlerList)
+ server.start()
+ port = server.getConnectors()(0).getLocalPort()
+ }
+ }
+
+ def stop() {
+ if (server == null) {
+ throw new ServerStateException("Server is already stopped")
+ } else {
+ server.stop()
+ port = -1
+ server = null
+ }
+ }
+
+ /**
+ * Get the URI of this HTTP server (http://host:port)
+ */
+ def uri: String = {
+ if (server == null) {
+ throw new ServerStateException("Server is not started")
+ } else {
+ return "http://" + getLocalIpAddress + ":" + port
+ }
+ }
+
+ /**
+ * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4)
+ */
+ private def getLocalIpAddress: String = {
+ // Get local IP as an array of four bytes
+ val bytes = InetAddress.getLocalHost().getAddress()
+ // Convert the bytes to ints (keeping in mind that they may be negative)
+ // and join them into a string
+ return bytes.map(b => (b.toInt + 256) % 256).mkString(".")
+ }
+}
diff --git a/src/scala/spark/repl/ExecutorClassLoader.scala b/src/scala/spark/repl/ExecutorClassLoader.scala
index 7d91b20e79..0833b319f7 100644
--- a/src/scala/spark/repl/ExecutorClassLoader.scala
+++ b/src/scala/spark/repl/ExecutorClassLoader.scala
@@ -1,7 +1,7 @@
package spark.repl
import java.io.{ByteArrayOutputStream, InputStream}
-import java.net.{URI, URL, URLClassLoader}
+import java.net.{URI, URL, URLClassLoader, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService}
import org.apache.hadoop.conf.Configuration
@@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._
-// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load
-// classes defined by the interpreter when the REPL is in use
-class ExecutorClassLoader(classDir: String, parent: ClassLoader)
+/**
+ * A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
+ * used to load classes defined by the interpreter when the REPL is used
+ */
+class ExecutorClassLoader(classUri: String, parent: ClassLoader)
extends ClassLoader(parent) {
- val fileSystem = FileSystem.get(new URI(classDir), new Configuration())
- val directory = new URI(classDir).getPath
+ val uri = new URI(classUri)
+ val directory = uri.getPath
+
+ // Hadoop FileSystem object for our URI, if it isn't using HTTP
+ var fileSystem: FileSystem = {
+ if (uri.getScheme() == "http")
+ null
+ else
+ FileSystem.get(uri, new Configuration())
+ }
override def findClass(name: String): Class[_] = {
try {
//println("repl.ExecutorClassLoader resolving " + name)
- val path = new Path(directory, name.replace('.', '/') + ".class")
- val bytes = readAndTransformClass(name, fileSystem.open(path))
+ val pathInDirectory = name.replace('.', '/') + ".class"
+ val inputStream = {
+ if (fileSystem != null)
+ fileSystem.open(new Path(directory, pathInDirectory))
+ else
+ new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
+ }
+ val bytes = readAndTransformClass(name, inputStream)
+ inputStream.close()
return defineClass(name, bytes, 0, bytes.length)
} catch {
case e: Exception => throw new ClassNotFoundException(name, e)
@@ -57,6 +74,13 @@ extends ClassLoader(parent) {
return bos.toByteArray
}
}
+
+ /**
+ * URL-encode a string, preserving only slashes
+ */
+ def urlEncode(str: String): String = {
+ str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
+ }
}
class ConstructorCleaner(className: String, cv: ClassVisitor)
diff --git a/src/scala/spark/repl/SparkInterpreter.scala b/src/scala/spark/repl/SparkInterpreter.scala
index 2bfaaf7342..29a4420950 100644
--- a/src/scala/spark/repl/SparkInterpreter.scala
+++ b/src/scala/spark/repl/SparkInterpreter.scala
@@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
- /** directory to save .class files to */
- //val virtualDirectory = new VirtualDirectory("(memory)", None)
- val virtualDirectory = {
+ val outputDir = {
val rootDir = new File(System.getProperty("spark.repl.classdir",
System.getProperty("java.io.tmpdir")))
var attempts = 0
val maxAttempts = 10
- var outputDir: File = null
- while (outputDir == null) {
+ var dir: File = null
+ while (dir == null) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException("Failed to create a temp directory " +
"after " + maxAttempts + " attempts!")
}
try {
- outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
- if (outputDir.exists() || !outputDir.mkdirs())
- outputDir = null
+ dir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
+ if (dir.exists() || !dir.mkdirs())
+ dir = null
} catch { case e: IOException => ; }
}
- System.setProperty("spark.repl.current.classdir",
- "file://" + outputDir.getAbsolutePath + "/")
if (SPARK_DEBUG_REPL)
- println("Output directory: " + outputDir)
- new PlainFile(outputDir)
+ println("Output directory: " + dir)
+ dir
}
+
+ /** directory to save .class files to */
+ //val virtualDirectory = new VirtualDirectory("(memory)", None)
+ val virtualDirectory = new PlainFile(outputDir)
+
+ /** Jetty server that will serve our classes to worker nodes */
+ val classServer = new ClassServer(outputDir)
+
+ // Start the classServer and remember its URI in a spark system property */
+ classServer.start()
+ println("ClassServer started, URI = " + classServer.uri)
+ System.setProperty("spark.repl.class.uri", classServer.uri)
/** reporter */
object reporter extends ConsoleReporter(settings, null, out) {
@@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
*/
def close() {
reporter.flush
+ classServer.stop()
}
/** A traverser that finds all mentioned identifiers, i.e. things
diff --git a/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar b/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
new file mode 100644
index 0000000000..d9ef50be6d
--- /dev/null
+++ b/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
Binary files differ
diff --git a/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar b/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
new file mode 100644
index 0000000000..fb52493468
--- /dev/null
+++ b/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
Binary files differ