aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-25 17:17:27 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-25 17:17:27 -0400
commit15b00914c53f1f4f00a3313968f68a8f032e7cb7 (patch)
treef389a33f88d9388e84bb97b9267e0c6151bba56e /core/src
parent7680ce0bd65fc44716c5bc03d5909a3ddbd43501 (diff)
downloadspark-15b00914c53f1f4f00a3313968f68a8f032e7cb7.tar.gz
spark-15b00914c53f1f4f00a3313968f68a8f032e7cb7.tar.bz2
spark-15b00914c53f1f4f00a3313968f68a8f032e7cb7.zip
Some fixes to the launch-java-directly change:
- Split SPARK_JAVA_OPTS into multiple command-line arguments if it contains spaces; this splitting follows quoting rules in bash - Add the Scala JARs to the classpath if they're not in the CLASSPATH variable because the ExecutorRunner is launched with "scala" (this can happen when using local-cluster URLs in spark-shell)
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/Utils.scala65
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala51
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala53
3 files changed, 138 insertions, 31 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index f3621c6bee..bdc1494cc9 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -522,7 +522,7 @@ private object Utils extends Logging {
execute(command, new File("."))
}
- private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
+ private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
val firstUserLine: Int, val firstUserClass: String)
/**
* When called inside a class in the spark package, returns the name of the user code class
@@ -610,4 +610,67 @@ private object Utils extends Logging {
}
return false
}
+
+ def isSpace(c: Char): Boolean = {
+ " \t\r\n".indexOf(c) != -1
+ }
+
+ /**
+ * Split a string of potentially quoted arguments from the command line the way that a shell
+ * would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
+ * then it would be parsed as three arguments: 'a', 'b c' and 'd'.
+ */
+ def splitCommandString(s: String): Seq[String] = {
+ val buf = new ArrayBuffer[String]
+ var inWord = false
+ var inSingleQuote = false
+ var inDoubleQuote = false
+ var curWord = new StringBuilder
+ def endWord() {
+ buf += curWord.toString
+ curWord.clear()
+ }
+ var i = 0
+ while (i < s.length) {
+ var nextChar = s.charAt(i)
+ if (inDoubleQuote) {
+ if (nextChar == '"') {
+ inDoubleQuote = false
+ } else if (nextChar == '\\') {
+ if (i < s.length - 1) {
+ // Append the next character directly, because only " and \ may be escaped in
+ // double quotes after the shell's own expansion
+ curWord.append(s.charAt(i + 1))
+ i += 1
+ }
+ } else {
+ curWord.append(nextChar)
+ }
+ } else if (inSingleQuote) {
+ if (nextChar == '\'') {
+ inSingleQuote = false
+ } else {
+ curWord.append(nextChar)
+ }
+ // Backslashes are not treated specially in single quotes
+ } else if (nextChar == '"') {
+ inWord = true
+ inDoubleQuote = true
+ } else if (nextChar == '\'') {
+ inWord = true
+ inSingleQuote = true
+ } else if (!isSpace(nextChar)) {
+ curWord.append(nextChar)
+ inWord = true
+ } else if (inWord && isSpace(nextChar)) {
+ endWord()
+ inWord = false
+ }
+ i += 1
+ }
+ if (inWord || inDoubleQuote || inSingleQuote) {
+ endWord()
+ }
+ return buf
+ }
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 4d31657d9e..db580e39ab 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -40,7 +40,7 @@ private[spark] class ExecutorRunner(
workerThread.start()
// Shutdown hook that kills actors on shutdown.
- shutdownHook = new Thread() {
+ shutdownHook = new Thread() {
override def run() {
if (process != null) {
logInfo("Shutdown hook killing child process.")
@@ -87,25 +87,43 @@ private[spark] class ExecutorRunner(
Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
command.arguments.map(substituteVariables)
}
-
- /*
- * Attention: this must always be aligned with the environment variables in the run scripts and the
- * way the JAVA_OPTS are assembled there.
+
+ /**
+ * Attention: this must always be aligned with the environment variables in the run scripts and
+ * the way the JAVA_OPTS are assembled there.
*/
def buildJavaOpts(): Seq[String] = {
- val _javaLibPath = if (System.getenv("SPARK_LIBRARY_PATH") == null) {
- ""
+ val libraryOpts = if (System.getenv("SPARK_LIBRARY_PATH") == null) {
+ Nil
+ } else {
+ List("-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH"))
+ }
+
+ val userOpts = if (System.getenv("SPARK_JAVA_OPTS") == null) {
+ Nil
} else {
- "-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH")
+ Utils.splitCommandString(System.getenv("SPARK_JAVA_OPTS"))
}
-
- Seq("-cp",
- System.getenv("CLASSPATH"),
- System.getenv("SPARK_JAVA_OPTS"),
- _javaLibPath,
- "-Xms" + memory.toString + "M",
- "-Xmx" + memory.toString + "M")
- .filter(_ != null)
+
+ val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
+
+ var classPath = System.getenv("CLASSPATH")
+ if (System.getenv("SPARK_LAUNCH_WITH_SCALA") == "1") {
+ // Add the Scala library JARs to the classpath; this is needed when the ExecutorRunner
+ // was launched with "scala" as the runner (e.g. in spark-shell in local-cluster mode)
+ // and the Scala libraries won't be in the CLASSPATH environment variable by defalt.
+ if (System.getenv("SCALA_LIBRARY_PATH") == null && System.getenv("SCALA_HOME") == null) {
+ logError("Cloud not launch executors: neither SCALA_LIBRARY_PATH nor SCALA_HOME are set")
+ System.exit(1)
+ }
+ val scalaLib = Option(System.getenv("SCALA_LIBRARY_PATH")).getOrElse(
+ System.getenv("SCALA_HOME") + "/lib")
+ classPath += ":" + scalaLib + "/scala-library.jar" +
+ ":" + scalaLib + "/scala-compiler.jar" +
+ ":" + scalaLib + "/jline.jar"
+ }
+
+ Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
@@ -136,6 +154,7 @@ private[spark] class ExecutorRunner(
// Launch the process
val command = buildCommandSeq()
+ println("COMMAND: " + command.mkString(" "))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- appDesc.command.environment) {
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index ed4701574f..4a113e16bf 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -27,24 +27,49 @@ class UtilsSuite extends FunSuite {
assert(os.toByteArray.toList.equals(bytes.toList))
}
- test("memoryStringToMb"){
- assert(Utils.memoryStringToMb("1") == 0)
- assert(Utils.memoryStringToMb("1048575") == 0)
- assert(Utils.memoryStringToMb("3145728") == 3)
+ test("memoryStringToMb") {
+ assert(Utils.memoryStringToMb("1") === 0)
+ assert(Utils.memoryStringToMb("1048575") === 0)
+ assert(Utils.memoryStringToMb("3145728") === 3)
- assert(Utils.memoryStringToMb("1024k") == 1)
- assert(Utils.memoryStringToMb("5000k") == 4)
- assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K"))
+ assert(Utils.memoryStringToMb("1024k") === 1)
+ assert(Utils.memoryStringToMb("5000k") === 4)
+ assert(Utils.memoryStringToMb("4024k") === Utils.memoryStringToMb("4024K"))
- assert(Utils.memoryStringToMb("1024m") == 1024)
- assert(Utils.memoryStringToMb("5000m") == 5000)
- assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M"))
+ assert(Utils.memoryStringToMb("1024m") === 1024)
+ assert(Utils.memoryStringToMb("5000m") === 5000)
+ assert(Utils.memoryStringToMb("4024m") === Utils.memoryStringToMb("4024M"))
- assert(Utils.memoryStringToMb("2g") == 2048)
- assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G"))
+ assert(Utils.memoryStringToMb("2g") === 2048)
+ assert(Utils.memoryStringToMb("3g") === Utils.memoryStringToMb("3G"))
- assert(Utils.memoryStringToMb("2t") == 2097152)
- assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T"))
+ assert(Utils.memoryStringToMb("2t") === 2097152)
+ assert(Utils.memoryStringToMb("3t") === Utils.memoryStringToMb("3T"))
+ }
+
+ test("splitCommandString") {
+ assert(Utils.splitCommandString("") === Seq())
+ assert(Utils.splitCommandString("a") === Seq("a"))
+ assert(Utils.splitCommandString("aaa") === Seq("aaa"))
+ assert(Utils.splitCommandString("a b c") === Seq("a", "b", "c"))
+ assert(Utils.splitCommandString(" a b\t c ") === Seq("a", "b", "c"))
+ assert(Utils.splitCommandString("a 'b c'") === Seq("a", "b c"))
+ assert(Utils.splitCommandString("a 'b c' d") === Seq("a", "b c", "d"))
+ assert(Utils.splitCommandString("'b c'") === Seq("b c"))
+ assert(Utils.splitCommandString("a \"b c\"") === Seq("a", "b c"))
+ assert(Utils.splitCommandString("a \"b c\" d") === Seq("a", "b c", "d"))
+ assert(Utils.splitCommandString("\"b c\"") === Seq("b c"))
+ assert(Utils.splitCommandString("a 'b\" c' \"d' e\"") === Seq("a", "b\" c", "d' e"))
+ assert(Utils.splitCommandString("a\t'b\nc'\nd") === Seq("a", "b\nc", "d"))
+ assert(Utils.splitCommandString("a \"b\\\\c\"") === Seq("a", "b\\c"))
+ assert(Utils.splitCommandString("a \"b\\\"c\"") === Seq("a", "b\"c"))
+ assert(Utils.splitCommandString("a 'b\\\"c'") === Seq("a", "b\\\"c"))
+ assert(Utils.splitCommandString("'a'b") === Seq("ab"))
+ assert(Utils.splitCommandString("'a''b'") === Seq("ab"))
+ assert(Utils.splitCommandString("\"a\"b") === Seq("ab"))
+ assert(Utils.splitCommandString("\"a\"\"b\"") === Seq("ab"))
+ assert(Utils.splitCommandString("''") === Seq(""))
+ assert(Utils.splitCommandString("\"\"") === Seq(""))
}
}