aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-10-29 23:02:58 -0700
committerAndrew Or <andrew@databricks.com>2014-10-29 23:02:58 -0700
commitcd739bd756875bd52e9bd8ae801e0ae10a1f6937 (patch)
treee64c462e3491eab7b127894f0c97197ba493b427
parent1234258077b1f4050845e9fb73066b37f981c72a (diff)
downloadspark-cd739bd756875bd52e9bd8ae801e0ae10a1f6937.tar.gz
spark-cd739bd756875bd52e9bd8ae801e0ae10a1f6937.tar.bz2
spark-cd739bd756875bd52e9bd8ae801e0ae10a1f6937.zip
[SPARK-1720][SPARK-1719] use LD_LIBRARY_PATH instead of -Djava.library.path
- [X] Standalone - [X] YARN - [X] Mesos - [X] Mac OS X - [X] Linux - [ ] Windows This is another implementation about #1031 Author: GuoQiang Li <witgo@qq.com> Closes #2711 from witgo/SPARK-1719 and squashes the following commits: c7b26f6 [GuoQiang Li] review commits 4488e41 [GuoQiang Li] Refactoring CommandUtils a444094 [GuoQiang Li] review commits 40c0b4a [GuoQiang Li] Add buildLocalCommand method c1a0ddd [GuoQiang Li] fix comments 156ce88 [GuoQiang Li] review commit 38aa377 [GuoQiang Li] Refactor CommandUtils.scala 4269e00 [GuoQiang Li] Refactor SparkSubmitDriverBootstrapper.scala 7a1d634 [GuoQiang Li] use LD_LIBRARY_PATH instead of -Djava.library.path
-rwxr-xr-xbin/spark-class6
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala42
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala5
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala11
13 files changed, 221 insertions, 81 deletions
diff --git a/bin/spark-class b/bin/spark-class
index 91d858bc06..925367b0dd 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -81,7 +81,11 @@ case "$1" in
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
- OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
+ if [[ $OSTYPE == darwin* ]]; then
+ export DYLD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$DYLD_LIBRARY_PATH"
+ else
+ export LD_LIBRARY_PATH="$SPARK_SUBMIT_LIBRARY_PATH:$LD_LIBRARY_PATH"
+ fi
fi
if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index dbbcc23305..ad0a9017af 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -244,6 +244,19 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
val executorClasspathKey = "spark.executor.extraClassPath"
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
+ val driverLibraryPathKey = "spark.driver.extraLibraryPath"
+
+ // Used by Yarn in 1.1 and before
+ sys.props.get("spark.driver.libraryPath").foreach { value =>
+ val warning =
+ s"""
+ |spark.driver.libraryPath was detected (set to '$value').
+ |This is deprecated in Spark 1.2+.
+ |
+ |Please instead use: $driverLibraryPathKey
+ """.stripMargin
+ logWarning(warning)
+ }
// Validate spark.executor.extraJavaOptions
settings.get(executorOptsKey).map { javaOpts =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 0125330589..2b894a796c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -82,17 +82,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
.orElse(confDriverMemory)
.getOrElse(defaultDriverMemory)
- val newLibraryPath =
- if (submitLibraryPath.isDefined) {
- // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
- ""
- } else {
- confLibraryPath.map("-Djava.library.path=" + _).getOrElse("")
- }
-
val newClasspath =
if (submitClasspath.isDefined) {
- // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
classpath
} else {
classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("")
@@ -114,7 +105,6 @@ private[spark] object SparkSubmitDriverBootstrapper {
val command: Seq[String] =
Seq(runner) ++
Seq("-cp", newClasspath) ++
- Seq(newLibraryPath) ++
filteredJavaOpts ++
Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
Seq("org.apache.spark.deploy.SparkSubmit") ++
@@ -130,6 +120,13 @@ private[spark] object SparkSubmitDriverBootstrapper {
// Start the driver JVM
val filteredCommand = command.filter(_.nonEmpty)
val builder = new ProcessBuilder(filteredCommand)
+ val env = builder.environment()
+
+ if (submitLibraryPath.isEmpty && confLibraryPath.nonEmpty) {
+ val libraryPaths = confLibraryPath ++ sys.env.get(Utils.libraryPathEnvName)
+ env.put(Utils.libraryPathEnvName, libraryPaths.mkString(sys.props("path.separator")))
+ }
+
val process = builder.start()
// Redirect stdout and stderr from the child JVM
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 2e9be2a180..aba2e20118 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -20,6 +20,8 @@ package org.apache.spark.deploy.worker
import java.io.{File, FileOutputStream, InputStream, IOException}
import java.lang.System._
+import scala.collection.Map
+
import org.apache.spark.Logging
import org.apache.spark.deploy.Command
import org.apache.spark.util.Utils
@@ -29,7 +31,29 @@ import org.apache.spark.util.Utils
*/
private[spark]
object CommandUtils extends Logging {
- def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+
+ /**
+ * Build a ProcessBuilder based on the given parameters.
+ * The `env` argument is exposed for testing.
+ */
+ def buildProcessBuilder(
+ command: Command,
+ memory: Int,
+ sparkHome: String,
+ substituteArguments: String => String,
+ classPaths: Seq[String] = Seq[String](),
+ env: Map[String, String] = sys.env): ProcessBuilder = {
+ val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
+ val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
+ val builder = new ProcessBuilder(commandSeq: _*)
+ val environment = builder.environment()
+ for ((key, value) <- localCommand.environment) {
+ environment.put(key, value)
+ }
+ builder
+ }
+
+ private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
// SPARK-698: do not call the run.cmd script, as process.destroy()
@@ -39,10 +63,40 @@ object CommandUtils extends Logging {
}
/**
+ * Build a command based on the given one, taking into account the local environment
+ * of where this command is expected to run, substitute any placeholders, and append
+ * any extra class paths.
+ */
+ private def buildLocalCommand(
+ command: Command,
+ substituteArguments: String => String,
+ classPath: Seq[String] = Seq[String](),
+ env: Map[String, String]): Command = {
+ val libraryPathName = Utils.libraryPathEnvName
+ val libraryPathEntries = command.libraryPathEntries
+ val cmdLibraryPath = command.environment.get(libraryPathName)
+
+ val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
+ val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
+ command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
+ } else {
+ command.environment
+ }
+
+ Command(
+ command.mainClass,
+ command.arguments.map(substituteArguments),
+ newEnvironment,
+ command.classPathEntries ++ classPath,
+ Seq[String](), // library path already captured in environment variable
+ command.javaOpts)
+ }
+
+ /**
* Attention: this must always be aligned with the environment variables in the run scripts and
* the way the JAVA_OPTS are assembled there.
*/
- def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+ private def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
// Exists for backwards compatibility with older Spark versions
@@ -53,14 +107,6 @@ object CommandUtils extends Logging {
logWarning("Set SPARK_LOCAL_DIRS for node-specific storage locations.")
}
- val libraryOpts =
- if (command.libraryPathEntries.size > 0) {
- val joined = command.libraryPathEntries.mkString(File.pathSeparator)
- Seq(s"-Djava.library.path=$joined")
- } else {
- Seq()
- }
-
// Figure out our classpath with the external compute-classpath script
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
@@ -71,7 +117,7 @@ object CommandUtils extends Logging {
val javaVersion = System.getProperty("java.version")
val permGenOpt = if (!javaVersion.startsWith("1.8")) Some("-XX:MaxPermSize=128m") else None
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
- permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
+ permGenOpt ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 3bf0b9492d..28cab36c7b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -76,17 +76,9 @@ private[spark] class DriverRunner(
// Make sure user application jar is on the classpath
// TODO: If we add ability to submit multiple jars they should also be added here
- val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename")
- val newCommand = Command(
- driverDesc.command.mainClass,
- driverDesc.command.arguments.map(substituteVariables),
- driverDesc.command.environment,
- classPath,
- driverDesc.command.libraryPathEntries,
- driverDesc.command.javaOpts)
- val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
- sparkHome.getAbsolutePath)
- launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
+ val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
+ sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename))
+ launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
case e: Exception => finalException = Some(e)
@@ -165,11 +157,8 @@ private[spark] class DriverRunner(
localJarFilename
}
- private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File,
- supervise: Boolean) {
- val builder = new ProcessBuilder(command: _*).directory(baseDir)
- envVars.map{ case(k,v) => builder.environment().put(k, v) }
-
+ private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
+ builder.directory(baseDir)
def initialize(process: Process) = {
// Redirect stdout and stderr to files
val stdout = new File(baseDir, "stdout")
@@ -177,7 +166,7 @@ private[spark] class DriverRunner(
val stderr = new File(baseDir, "stderr")
val header = "Launch Command: %s\n%s\n\n".format(
- command.mkString("\"", "\" \"", "\""), "=" * 40)
+ builder.command.mkString("\"", "\" \"", "\""), "=" * 40)
Files.append(header, stderr, UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 030a651469..8ba6a01bbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker
import java.io._
+import scala.collection.JavaConversions._
+
import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
@@ -115,33 +117,21 @@ private[spark] class ExecutorRunner(
case other => other
}
- def getCommandSeq = {
- val command = Command(
- appDesc.command.mainClass,
- appDesc.command.arguments.map(substituteVariables),
- appDesc.command.environment,
- appDesc.command.classPathEntries,
- appDesc.command.libraryPathEntries,
- appDesc.command.javaOpts)
- CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
- }
-
/**
* Download and run the executor described in our ApplicationDescription
*/
def fetchAndRunExecutor() {
try {
// Launch the process
- val command = getCommandSeq
+ val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
+ sparkHome.getAbsolutePath, substituteVariables)
+ val command = builder.command()
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
- val builder = new ProcessBuilder(command: _*).directory(executorDir)
- val env = builder.environment()
- for ((key, value) <- appDesc.command.environment) {
- env.put(key, value)
- }
+
+ builder.directory(executorDir)
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
- env.put("SPARK_LAUNCH_WITH_SCALA", "0")
+ builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d7f88de4b4..d8c0e2f66d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -31,6 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.Utils
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -120,16 +121,18 @@ private[spark] class CoarseMesosSchedulerBackend(
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
- val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
+ val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")
- val libraryPathOption = "spark.executor.extraLibraryPath"
- val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
- val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
+ // Set the environment variable through a command prefix
+ // to append to the existing value of the variable
+ val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p =>
+ Utils.libraryPathEnvPrefix(Seq(p))
+ }.getOrElse("")
environment.addVariables(
Environment.Variable.newBuilder()
.setName("SPARK_EXECUTOR_OPTS")
- .setValue(extraOpts)
+ .setValue(extraJavaOpts)
.build())
sc.executorEnvs.foreach { case (key, value) =>
@@ -150,16 +153,17 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
+ "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
+ prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue,
+ offer.getHostname, numCores, appId))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
- ("cd %s*; " +
+ ("cd %s*; %s " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
- .format(basename, driverUrl, offer.getSlaveId.getValue,
+ .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores, appId))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index e0f2fd622f..8e2faff90f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -98,15 +98,16 @@ private[spark] class MesosSchedulerBackend(
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
- val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
- val extraLibraryPath = sc.conf.getOption("spark.executor.extraLibraryPath").map { lp =>
- s"-Djava.library.path=$lp"
- }
- val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
+ val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")
+
+ val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
+ Utils.libraryPathEnvPrefix(Seq(p))
+ }.getOrElse("")
+
environment.addVariables(
Environment.Variable.newBuilder()
.setName("SPARK_EXECUTOR_OPTS")
- .setValue(extraOpts)
+ .setValue(extraJavaOpts)
.build())
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@@ -118,12 +119,13 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
if (uri == null) {
- command.setValue(new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath)
+ val executorPath = new File(executorSparkHome, "/sbin/spark-executor").getCanonicalPath
+ command.setValue("%s %s".format(prefixEnv, executorPath))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
+ command.setValue("cd %s*; %s ./sbin/spark-executor".format(basename, prefixEnv))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val cpus = Resource.newBuilder()
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0daab91143..063895d3c5 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
-import org.eclipse.jetty.util.MultiException
-
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
@@ -39,6 +37,7 @@ import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.PropertyConfigurator
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.eclipse.jetty.util.MultiException
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
@@ -1382,6 +1381,11 @@ private[spark] object Utils extends Logging {
val isWindows = SystemUtils.IS_OS_WINDOWS
/**
+ * Whether the underlying operating system is Mac OS X.
+ */
+ val isMac = SystemUtils.IS_OS_MAC_OSX
+
+ /**
* Pattern for matching a Windows drive, which contains only a single alphabet character.
*/
val windowsDrive = "([a-zA-Z])".r
@@ -1714,6 +1718,40 @@ private[spark] object Utils extends Logging {
method.invoke(obj, values.toSeq: _*)
}
+ /**
+ * Return the current system LD_LIBRARY_PATH name
+ */
+ def libraryPathEnvName: String = {
+ if (isWindows) {
+ "PATH"
+ } else if (isMac) {
+ "DYLD_LIBRARY_PATH"
+ } else {
+ "LD_LIBRARY_PATH"
+ }
+ }
+
+ /**
+ * Return the prefix of a command that appends the given library paths to the
+ * system-specific library path environment variable. On Unix, for instance,
+ * this returns the string LD_LIBRARY_PATH="path1:path2:$LD_LIBRARY_PATH".
+ */
+ def libraryPathEnvPrefix(libraryPaths: Seq[String]): String = {
+ val libraryPathScriptVar = if (isWindows) {
+ s"%${libraryPathEnvName}%"
+ } else {
+ "$" + libraryPathEnvName
+ }
+ val libraryPath = (libraryPaths :+ libraryPathScriptVar).mkString("\"",
+ File.pathSeparator, "\"")
+ val ampersand = if (Utils.isWindows) {
+ " &"
+ } else {
+ ""
+ }
+ s"$libraryPathEnvName=$libraryPath$ampersand"
+ }
+
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala
new file mode 100644
index 0000000000..7915ee75d8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/CommandUtilsSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.apache.spark.deploy.worker.CommandUtils
+import org.apache.spark.util.Utils
+
+import org.scalatest.{FunSuite, Matchers}
+
+class CommandUtilsSuite extends FunSuite with Matchers {
+
+ test("set libraryPath correctly") {
+ val appId = "12345-worker321-9876"
+ val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
+ val cmd = new Command("mainClass", Seq(), Map(), Seq(), Seq("libraryPathToB"), Seq())
+ val builder = CommandUtils.buildProcessBuilder(cmd, 512, sparkHome, t => t)
+ val libraryPath = Utils.libraryPathEnvName
+ val env = builder.environment
+ env.keySet should contain(libraryPath)
+ assert(env.get(libraryPath).startsWith("libraryPathToB"))
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 5e2592e8d2..1962170629 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.worker
import java.io.File
+import scala.collection.JavaConversions._
+
import org.scalatest.FunSuite
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
@@ -32,6 +34,7 @@ class ExecutorRunnerTest extends FunSuite {
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
- assert(er.getCommandSeq.last === appId)
+ val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
+ assert(builder.command().last === appId)
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 8ea0e7cf40..f95d723791 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.util.Utils
/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN.
@@ -312,6 +313,10 @@ private[spark] trait ClientBase extends Logging {
val javaOpts = ListBuffer[String]()
+ // Set the environment variable through a command prefix
+ // to append to the existing value of the variable
+ var prefixEnv: Option[String] = None
+
// Add Xmx for AM memory
javaOpts += "-Xmx" + args.amMemory + "m"
@@ -348,8 +353,11 @@ private[spark] trait ClientBase extends Logging {
sparkConf.getOption("spark.driver.extraJavaOptions")
.orElse(sys.env.get("SPARK_JAVA_OPTS"))
.foreach(opts => javaOpts += opts)
- sparkConf.getOption("spark.driver.libraryPath")
- .foreach(p => javaOpts += s"-Djava.library.path=$p")
+ val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
+ sys.props.get("spark.driver.libraryPath")).flatten
+ if (libraryPaths.nonEmpty) {
+ prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
+ }
}
// For log4j configuration to reference
@@ -384,7 +392,7 @@ private[spark] trait ClientBase extends Logging {
"--num-executors ", args.numExecutors.toString)
// Command for the ApplicationMaster
- val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
+ val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 5cb4753de2..88dad0febd 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.util.Utils
trait ExecutorRunnableUtil extends Logging {
@@ -47,6 +48,11 @@ trait ExecutorRunnableUtil extends Logging {
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
+
+ // Set the environment variable through a command prefix
+ // to append to the existing value of the variable
+ var prefixEnv: Option[String] = None
+
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
@@ -58,6 +64,9 @@ trait ExecutorRunnableUtil extends Logging {
sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
javaOpts += opts
}
+ sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
+ prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+ }
javaOpts += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
@@ -101,7 +110,7 @@ trait ExecutorRunnableUtil extends Logging {
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
- val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java",
+ val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$() + "/bin/java",
"-server",
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in