aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-25 18:21:00 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-25 18:21:00 -0400
commit6c8d1b2ca618a1a17566ede46821c0807a1b11f5 (patch)
treec726a4023ac97d520ed959f45239b1c932db0d5b /core/src
parent15b00914c53f1f4f00a3313968f68a8f032e7cb7 (diff)
downloadspark-6c8d1b2ca618a1a17566ede46821c0807a1b11f5.tar.gz
spark-6c8d1b2ca618a1a17566ede46821c0807a1b11f5.tar.bz2
spark-6c8d1b2ca618a1a17566ede46821c0807a1b11f5.zip
Fix computation of classpath when we launch java directly
The previous version assumed that a CLASSPATH environment variable was set by the "run" script when launching the process that starts the ExecutorRunner, but unfortunately this is not true in tests. Instead, we factor the classpath calculation into an extenral script and call that. NOTE: This includes a Windows version but hasn't yet been tested there.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/Utils.scala31
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala19
2 files changed, 34 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index bdc1494cc9..f41efa9d29 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -522,6 +522,37 @@ private object Utils extends Logging {
execute(command, new File("."))
}
+ /**
+ * Execute a command and get its output, throwing an exception if it yields a code other than 0.
+ */
+ def executeAndGetOutput(command: Seq[String], workingDir: File = new File(".")): String = {
+ val process = new ProcessBuilder(command: _*)
+ .directory(workingDir)
+ .start()
+ new Thread("read stderr for " + command(0)) {
+ override def run() {
+ for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
+ System.err.println(line)
+ }
+ }
+ }.start()
+ val output = new StringBuffer
+ val stdoutThread = new Thread("read stdout for " + command(0)) {
+ override def run() {
+ for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+ output.append(line)
+ }
+ }
+ }
+ stdoutThread.start()
+ val exitCode = process.waitFor()
+ stdoutThread.join() // Wait for it to finish reading output
+ if (exitCode != 0) {
+ throw new SparkException("Process " + command + " exited with code " + exitCode)
+ }
+ output.toString
+ }
+
private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
val firstUserLine: Int, val firstUserClass: String)
/**
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index db580e39ab..4f8e1dcb26 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -107,21 +107,9 @@ private[spark] class ExecutorRunner(
val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
- var classPath = System.getenv("CLASSPATH")
- if (System.getenv("SPARK_LAUNCH_WITH_SCALA") == "1") {
- // Add the Scala library JARs to the classpath; this is needed when the ExecutorRunner
- // was launched with "scala" as the runner (e.g. in spark-shell in local-cluster mode)
- // and the Scala libraries won't be in the CLASSPATH environment variable by defalt.
- if (System.getenv("SCALA_LIBRARY_PATH") == null && System.getenv("SCALA_HOME") == null) {
- logError("Cloud not launch executors: neither SCALA_LIBRARY_PATH nor SCALA_HOME are set")
- System.exit(1)
- }
- val scalaLib = Option(System.getenv("SCALA_LIBRARY_PATH")).getOrElse(
- System.getenv("SCALA_HOME") + "/lib")
- classPath += ":" + scalaLib + "/scala-library.jar" +
- ":" + scalaLib + "/scala-compiler.jar" +
- ":" + scalaLib + "/jline.jar"
- }
+ // Figure out our classpath with the external compute-classpath script
+ val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
+ val classPath = Utils.executeAndGetOutput(Seq(sparkHome + "/bin/compute-classpath" + ext))
Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts
}
@@ -154,7 +142,6 @@ private[spark] class ExecutorRunner(
// Launch the process
val command = buildCommandSeq()
- println("COMMAND: " + command.mkString(" "))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- appDesc.command.environment) {