diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-25 18:21:00 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-25 18:21:00 -0400 |
commit | 6c8d1b2ca618a1a17566ede46821c0807a1b11f5 (patch) | |
tree | c726a4023ac97d520ed959f45239b1c932db0d5b /core/src | |
parent | 15b00914c53f1f4f00a3313968f68a8f032e7cb7 (diff) | |
download | spark-6c8d1b2ca618a1a17566ede46821c0807a1b11f5.tar.gz spark-6c8d1b2ca618a1a17566ede46821c0807a1b11f5.tar.bz2 spark-6c8d1b2ca618a1a17566ede46821c0807a1b11f5.zip |
Fix computation of classpath when we launch java directly
The previous version assumed that a CLASSPATH environment variable was
set by the "run" script when launching the process that starts the
ExecutorRunner, but unfortunately this is not true in tests. Instead, we
factor the classpath calculation into an extenral script and call that.
NOTE: This includes a Windows version but hasn't yet been tested there.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 31 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 19 |
2 files changed, 34 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index bdc1494cc9..f41efa9d29 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -522,6 +522,37 @@ private object Utils extends Logging { execute(command, new File(".")) } + /** + * Execute a command and get its output, throwing an exception if it yields a code other than 0. + */ + def executeAndGetOutput(command: Seq[String], workingDir: File = new File(".")): String = { + val process = new ProcessBuilder(command: _*) + .directory(workingDir) + .start() + new Thread("read stderr for " + command(0)) { + override def run() { + for (line <- Source.fromInputStream(process.getErrorStream).getLines) { + System.err.println(line) + } + } + }.start() + val output = new StringBuffer + val stdoutThread = new Thread("read stdout for " + command(0)) { + override def run() { + for (line <- Source.fromInputStream(process.getInputStream).getLines) { + output.append(line) + } + } + } + stdoutThread.start() + val exitCode = process.waitFor() + stdoutThread.join() // Wait for it to finish reading output + if (exitCode != 0) { + throw new SparkException("Process " + command + " exited with code " + exitCode) + } + output.toString + } + private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, val firstUserLine: Int, val firstUserClass: String) /** diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index db580e39ab..4f8e1dcb26 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -107,21 +107,9 @@ private[spark] class ExecutorRunner( val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M") - var classPath = System.getenv("CLASSPATH") - if (System.getenv("SPARK_LAUNCH_WITH_SCALA") == "1") { - // Add the Scala library JARs to the classpath; this is needed when the ExecutorRunner - // was launched with "scala" as the runner (e.g. in spark-shell in local-cluster mode) - // and the Scala libraries won't be in the CLASSPATH environment variable by defalt. - if (System.getenv("SCALA_LIBRARY_PATH") == null && System.getenv("SCALA_HOME") == null) { - logError("Cloud not launch executors: neither SCALA_LIBRARY_PATH nor SCALA_HOME are set") - System.exit(1) - } - val scalaLib = Option(System.getenv("SCALA_LIBRARY_PATH")).getOrElse( - System.getenv("SCALA_HOME") + "/lib") - classPath += ":" + scalaLib + "/scala-library.jar" + - ":" + scalaLib + "/scala-compiler.jar" + - ":" + scalaLib + "/jline.jar" - } + // Figure out our classpath with the external compute-classpath script + val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" + val classPath = Utils.executeAndGetOutput(Seq(sparkHome + "/bin/compute-classpath" + ext)) Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts } @@ -154,7 +142,6 @@ private[spark] class ExecutorRunner( // Launch the process val command = buildCommandSeq() - println("COMMAND: " + command.mkString(" ")) val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() for ((key, value) <- appDesc.command.environment) { |