diff options
-rw-r--r-- | core/src/main/scala/spark/executor/Executor.scala | 38 | ||||
-rw-r--r-- | repl/src/main/scala/spark/repl/SparkILoop.scala | 4 |
2 files changed, 25 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 8bebfafce4..2bf55ea9a9 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -42,7 +42,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert // Create our ClassLoader and set it on this thread private val urlClassLoader = createClassLoader() - Thread.currentThread.setContextClassLoader(urlClassLoader) + private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + Thread.currentThread.setContextClassLoader(replClassLoader) // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. @@ -88,7 +89,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert override def run() { val startTime = System.currentTimeMillis() SparkEnv.set(env) - Thread.currentThread.setContextClassLoader(urlClassLoader) + Thread.currentThread.setContextClassLoader(replClassLoader) val ser = SparkEnv.get.closureSerializer.newInstance() logInfo("Running task ID " + taskId) context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) @@ -153,26 +154,31 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert val urls = currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL }.toArray - loader = new URLClassLoader(urls, loader) + new ExecutorURLClassLoader(urls, loader) + } - // If the REPL is in use, add another ClassLoader that will read - // new classes defined by the REPL as the user types code + /** + * If the REPL is in use, add another ClassLoader that will read + * new classes defined by the REPL as the user types code + */ + private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = { val classUri = System.getProperty("spark.repl.class.uri") if (classUri != null) { logInfo("Using REPL class URI: " + classUri) - loader = { - try { - val klass = Class.forName("spark.repl.ExecutorClassLoader") - .asInstanceOf[Class[_ <: ClassLoader]] - val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader]) - constructor.newInstance(classUri, loader) - } catch { - case _: ClassNotFoundException => loader - } + try { + val klass = Class.forName("spark.repl.ExecutorClassLoader") + .asInstanceOf[Class[_ <: ClassLoader]] + val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader]) + return constructor.newInstance(classUri, parent) + } catch { + case _: ClassNotFoundException => + logError("Could not find spark.repl.ExecutorClassLoader on classpath!") + System.exit(1) + null } + } else { + return parent } - - return new ExecutorURLClassLoader(Array(), loader) } /** diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index 86eed090d0..59f9d05683 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -838,7 +838,9 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: if (prop != null) prop else "local" } } - val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')).getOrElse(new Array[String](0)) + val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')) + .getOrElse(new Array[String](0)) + .map(new java.io.File(_).getAbsolutePath) sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) sparkContext } |