aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala46
1 files changed, 4 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 3e013c3209..83f78cf473 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -20,10 +20,12 @@ package org.apache.spark.deploy.worker
import java.io.{File, FileOutputStream, InputStream, IOException}
import java.lang.System._
+import scala.collection.JavaConversions._
import scala.collection.Map
import org.apache.spark.Logging
import org.apache.spark.deploy.Command
+import org.apache.spark.launcher.WorkerCommandBuilder
import org.apache.spark.util.Utils
/**
@@ -54,12 +56,10 @@ object CommandUtils extends Logging {
}
private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
-
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
- Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++
- command.arguments
+ val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand()
+ cmd.toSeq ++ Seq(command.mainClass) ++ command.arguments
}
/**
@@ -92,44 +92,6 @@ object CommandUtils extends Logging {
command.javaOpts)
}
- /**
- * Attention: this must always be aligned with the environment variables in the run scripts and
- * the way the JAVA_OPTS are assembled there.
- */
- private def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
-
- // Exists for backwards compatibility with older Spark versions
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
- .getOrElse(Nil)
- if (workerLocalOpts.length > 0) {
- logWarning("SPARK_JAVA_OPTS was set on the worker. It is deprecated in Spark 1.0.")
- logWarning("Set SPARK_LOCAL_DIRS for node-specific storage locations.")
- }
-
- // Figure out our classpath with the external compute-classpath script
- val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
- val classPath = Utils.executeAndGetOutput(
- Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment = command.environment)
- val userClassPath = command.classPathEntries ++ Seq(classPath)
-
- val javaVersion = System.getProperty("java.version")
-
- val javaOpts = workerLocalOpts ++ command.javaOpts
-
- val permGenOpt =
- if (!javaVersion.startsWith("1.8") && !javaOpts.exists(_.startsWith("-XX:MaxPermSize="))) {
- // do not specify -XX:MaxPermSize if it was already specified by user
- Some("-XX:MaxPermSize=128m")
- } else {
- None
- }
-
- Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
- permGenOpt ++ javaOpts ++ memoryOpts
- }
-
/** Spawn a thread that will redirect a given stream to a file */
def redirectStream(in: InputStream, file: File) {
val out = new FileOutputStream(file, true)