aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala76
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Command.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala190
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
14 files changed, 369 insertions, 93 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b947feb891..bd21fdc5a1 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -208,6 +208,82 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
new SparkConf(false).setAll(settings)
}
+ /** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
+ * idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
+ private[spark] def validateSettings() {
+ if (settings.contains("spark.local.dir")) {
+ val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
+ "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
+ logWarning(msg)
+ }
+
+ val executorOptsKey = "spark.executor.extraJavaOptions"
+ val executorClasspathKey = "spark.executor.extraClassPath"
+ val driverOptsKey = "spark.driver.extraJavaOptions"
+ val driverClassPathKey = "spark.driver.extraClassPath"
+
+ // Validate spark.executor.extraJavaOptions
+ settings.get(executorOptsKey).map { javaOpts =>
+ if (javaOpts.contains("-Dspark")) {
+ val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts)'. " +
+ "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
+ throw new Exception(msg)
+ }
+ if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) {
+ val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). " +
+ "Use spark.executor.memory instead."
+ throw new Exception(msg)
+ }
+ }
+
+ // Check for legacy configs
+ sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
+ val error =
+ s"""
+ |SPARK_JAVA_OPTS was detected (set to '$value').
+ |This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
+ |
+ |Please instead use:
+ | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
+ | - ./spark-submit with --driver-java-options to set -X options for a driver
+ | - spark.executor.extraJavaOptions to set -X options for executors
+ | - SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master, worker)
+ """.stripMargin
+ logError(error)
+
+ for (key <- Seq(executorOptsKey, driverOptsKey)) {
+ if (getOption(key).isDefined) {
+ throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
+ } else {
+ logWarning(s"Setting '$key' to '$value' as a work-around.")
+ set(key, value)
+ }
+ }
+ }
+
+ sys.env.get("SPARK_CLASSPATH").foreach { value =>
+ val error =
+ s"""
+ |SPARK_CLASSPATH was detected (set to '$value').
+ | This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
+ |
+ |Please instead use:
+ | - ./spark-submit with --driver-class-path to augment the driver classpath
+ | - spark.executor.extraClassPath to augment the executor classpath
+ """.stripMargin
+ logError(error)
+
+ for (key <- Seq(executorClasspathKey, driverClassPathKey)) {
+ if (getOption(key).isDefined) {
+ throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.")
+ } else {
+ logWarning(s"Setting '$key' to '$value' as a work-around.")
+ set(key, value)
+ }
+ }
+ }
+ }
+
/**
* Return a string listing all keys and values, one per line. This is useful to print the
* configuration out for debugging.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d3ef75bc73..7933d68d67 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -148,6 +148,7 @@ class SparkContext(config: SparkConf) extends Logging {
this(master, appName, sparkHome, jars, Map(), Map())
private[spark] val conf = config.clone()
+ conf.validateSettings()
/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
@@ -159,7 +160,7 @@ class SparkContext(config: SparkConf) extends Logging {
throw new SparkException("A master URL must be set in your configuration")
}
if (!conf.contains("spark.app.name")) {
- throw new SparkException("An application must be set in your configuration")
+ throw new SparkException("An application name must be set in your configuration")
}
if (conf.getBoolean("spark.logConf", false)) {
@@ -170,11 +171,11 @@ class SparkContext(config: SparkConf) extends Logging {
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")
- val jars: Seq[String] = if (conf.contains("spark.jars")) {
- conf.get("spark.jars").split(",").filter(_.size != 0)
- } else {
- null
- }
+ val jars: Seq[String] =
+ conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
+
+ val files: Seq[String] =
+ conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
@@ -235,6 +236,10 @@ class SparkContext(config: SparkConf) extends Logging {
jars.foreach(addJar)
}
+ if (files != null) {
+ files.foreach(addFile)
+ }
+
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
@@ -247,22 +252,20 @@ class SparkContext(config: SparkConf) extends Logging {
.map(Utils.memoryStringToMb)
.getOrElse(512)
- // Environment variables to pass to our executors
- private[spark] val executorEnvs = HashMap[String, String]()
- for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
- value <- Option(System.getenv(key))) {
- executorEnvs(key) = value
- }
+ // Environment variables to pass to our executors.
+ // NOTE: This should only be used for test related settings.
+ private[spark] val testExecutorEnvs = HashMap[String, String]()
+
// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
- for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
+ for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
- executorEnvs(envKey) = value
+ testExecutorEnvs(envKey) = value
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
// TODO: Set this only in the Mesos scheduler.
- executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
- executorEnvs ++= conf.getExecutorEnv
+ testExecutorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
+ testExecutorEnvs ++= conf.getExecutorEnv
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
@@ -270,7 +273,7 @@ class SparkContext(config: SparkConf) extends Logging {
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
- executorEnvs("SPARK_USER") = sparkUser
+ testExecutorEnvs("SPARK_USER") = sparkUser
// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 8fd2c7e95b..7ead117152 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -54,8 +54,21 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
System.getenv().foreach{case (k, v) => env(k) = v}
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
+
+ val classPathConf = "spark.driver.extraClassPath"
+ val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+
+ val libraryPathConf = "spark.driver.extraLibraryPath"
+ val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+
+ val javaOptionsConf = "spark.driver.extraJavaOptions"
+ val javaOpts = sys.props.get(javaOptionsConf)
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
- driverArgs.driverOptions, env)
+ driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala
index fa8af9a646..32f3ba3850 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Command.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala
@@ -22,5 +22,8 @@ import scala.collection.Map
private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
- environment: Map[String, String]) {
+ environment: Map[String, String],
+ classPathEntries: Seq[String],
+ libraryPathEntries: Seq[String],
+ extraJavaOptions: Option[String] = None) {
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e5d593cade..1b1e0fce0e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,14 +17,12 @@
package org.apache.spark.deploy
-import java.io.{PrintStream, File}
+import java.io.{File, PrintStream}
import java.net.{URI, URL}
-import org.apache.spark.executor.ExecutorURLClassLoader
+import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
+import org.apache.spark.executor.ExecutorURLClassLoader
/**
* Scala code behind the spark-submit script. The script handles setting up the classpath with
@@ -63,7 +61,8 @@ object SparkSubmit {
/**
* @return
* a tuple containing the arguments for the child, a list of classpath
- * entries for the child, and the main class for the child
+ * entries for the child, a list of system propertes, a list of env vars
+ * and the main class for the child
*/
private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
@@ -123,6 +122,12 @@ object SparkSubmit {
val options = List[OptionAssigner](
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+ new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
+ sysProp = "spark.driver.extraClassPath"),
+ new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
+ sysProp = "spark.driver.extraJavaOptions"),
+ new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
+ sysProp = "spark.driver.extraLibraryPath"),
new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
@@ -142,10 +147,14 @@ object SparkSubmit {
new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
- new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars")
+ new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
+ new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
+ new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
+ new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false,
+ sysProp = "spark.app.name")
)
- // more jars
+ // For client mode make any added jars immediately visible on the classpath
if (appArgs.jars != null && !deployOnCluster) {
for (jar <- appArgs.jars.split(",")) {
childClasspath += jar
@@ -163,6 +172,14 @@ object SparkSubmit {
}
}
+ // For standalone mode, add the application jar automatically so the user doesn't have to
+ // call sc.addJar. TODO: Standalone mode in the cluster
+ if (clusterManager == STANDALONE) {
+ val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+ sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
+ println("SPARK JARS" + sysProps.get("spark.jars"))
+ }
+
if (deployOnCluster && clusterManager == STANDALONE) {
if (appArgs.supervise) {
childArgs += "--supervise"
@@ -173,7 +190,7 @@ object SparkSubmit {
childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
}
- // args
+ // Arguments to be passed to user program
if (appArgs.childArgs != null) {
if (!deployOnCluster || clusterManager == STANDALONE) {
childArgs ++= appArgs.childArgs
@@ -184,6 +201,10 @@ object SparkSubmit {
}
}
+ for ((k, v) <- appArgs.getDefaultSparkProperties) {
+ if (!sysProps.contains(k)) sysProps(k) = v
+ }
+
(childArgs, childClasspath, sysProps, childMainClass)
}
@@ -191,11 +212,11 @@ object SparkSubmit {
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
if (verbose) {
- System.err.println(s"Main class:\n$childMainClass")
- System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
- System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
- System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
- System.err.println("\n")
+ printStream.println(s"Main class:\n$childMainClass")
+ printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
+ printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
+ printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
+ printStream.println("\n")
}
val loader = new ExecutorURLClassLoader(new Array[URL](0),
@@ -226,6 +247,10 @@ object SparkSubmit {
}
}
+/**
+ * Provides an indirection layer for passing arguments as system properties or flags to
+ * the user's driver program or to downstream launcher tools.
+ */
private[spark] class OptionAssigner(val value: String,
val clusterManager: Int,
val deployOnCluster: Boolean,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 834b3df2f1..02502adfbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -17,18 +17,28 @@
package org.apache.spark.deploy
-import scala.collection.mutable.ArrayBuffer
+import java.io.{File, FileInputStream, IOException}
+import java.util.Properties
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ArrayBuffer}
+
+import org.apache.spark.SparkException
/**
* Parses and encapsulates arguments from the spark-submit script.
*/
private[spark] class SparkSubmitArguments(args: Array[String]) {
- var master: String = "local"
+ var master: String = null
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
var totalExecutorCores: String = null
+ var propertiesFile: String = null
var driverMemory: String = null
+ var driverExtraClassPath: String = null
+ var driverExtraLibraryPath: String = null
+ var driverExtraJavaOptions: String = null
var driverCores: String = null
var supervise: Boolean = false
var queue: String = null
@@ -42,42 +52,102 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
var jars: String = null
var verbose: Boolean = false
- loadEnvVars()
parseOpts(args.toList)
+ loadDefaults()
+ checkRequiredArguments()
+
+ /** Return default present in the currently defined defaults file. */
+ def getDefaultSparkProperties = {
+ val defaultProperties = new HashMap[String, String]()
+ if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
+ Option(propertiesFile).foreach { filename =>
+ val file = new File(filename)
+ SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
+ if (k.startsWith("spark")) {
+ defaultProperties(k) = v
+ if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
+ }
+ else {
+ SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
+ }
+ }
+ }
+ defaultProperties
+ }
+
+ /** Fill in any undefined values based on the current properties file or built-in defaults. */
+ private def loadDefaults() = {
+
+ // Use common defaults file, if not specified by user
+ if (propertiesFile == null) {
+ sys.env.get("SPARK_HOME").foreach { sparkHome =>
+ val sep = File.separator
+ val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf"
+ val file = new File(defaultPath)
+ if (file.exists()) {
+ propertiesFile = file.getAbsolutePath
+ }
+ }
+ }
+
+ val defaultProperties = getDefaultSparkProperties
+ // Use properties file as fallback for values which have a direct analog to
+ // arguments in this script.
+ master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
+ executorMemory = Option(executorMemory)
+ .getOrElse(defaultProperties.get("spark.executor.memory").orNull)
+ executorCores = Option(executorCores)
+ .getOrElse(defaultProperties.get("spark.executor.cores").orNull)
+ totalExecutorCores = Option(totalExecutorCores)
+ .getOrElse(defaultProperties.get("spark.cores.max").orNull)
+ name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
+ jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
- // Sanity checks
- if (args.length == 0) printUsageAndExit(-1)
- if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
- if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+ // This supports env vars in older versions of Spark
+ master = Option(master).getOrElse(System.getenv("MASTER"))
+ deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
+
+ // Global defaults. These should be keep to minimum to avoid confusing behavior.
+ master = Option(master).getOrElse("local")
+ }
+
+ /** Ensure that required fields exists. Call this only once all defaults are loaded. */
+ private def checkRequiredArguments() = {
+ if (args.length == 0) printUsageAndExit(-1)
+ if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
+ if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+ }
override def toString = {
s"""Parsed arguments:
- | master $master
- | deployMode $deployMode
- | executorMemory $executorMemory
- | executorCores $executorCores
- | totalExecutorCores $totalExecutorCores
- | driverMemory $driverMemory
- | drivercores $driverCores
- | supervise $supervise
- | queue $queue
- | numExecutors $numExecutors
- | files $files
- | archives $archives
- | mainClass $mainClass
- | primaryResource $primaryResource
- | name $name
- | childArgs [${childArgs.mkString(" ")}]
- | jars $jars
- | verbose $verbose
+ | master $master
+ | deployMode $deployMode
+ | executorMemory $executorMemory
+ | executorCores $executorCores
+ | totalExecutorCores $totalExecutorCores
+ | propertiesFile $propertiesFile
+ | driverMemory $driverMemory
+ | driverCores $driverCores
+ | driverExtraClassPath $driverExtraClassPath
+ | driverExtraLibraryPath $driverExtraLibraryPath
+ | driverExtraJavaOptions $driverExtraJavaOptions
+ | supervise $supervise
+ | queue $queue
+ | numExecutors $numExecutors
+ | files $files
+ | archives $archives
+ | mainClass $mainClass
+ | primaryResource $primaryResource
+ | name $name
+ | childArgs [${childArgs.mkString(" ")}]
+ | jars $jars
+ | verbose $verbose
+ |
+ |Default properties from $propertiesFile:
+ |${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}
- private def loadEnvVars() {
- Option(System.getenv("MASTER")).map(master = _)
- Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
- }
-
private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
@@ -122,6 +192,22 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
driverCores = value
parseOpts(tail)
+ case ("--driver-class-path") :: value :: tail =>
+ driverExtraClassPath = value
+ parseOpts(tail)
+
+ case ("--driver-java-options") :: value :: tail =>
+ driverExtraJavaOptions = value
+ parseOpts(tail)
+
+ case ("--driver-library-path") :: value :: tail =>
+ driverExtraLibraryPath = value
+ parseOpts(tail)
+
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parseOpts(tail)
+
case ("--supervise") :: tail =>
supervise = true
parseOpts(tail)
@@ -154,6 +240,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
parseOpts(tail)
case value :: tail =>
+ if (value.startsWith("-")) {
+ val errMessage = s"Unrecognized option '$value'."
+ val suggestion: Option[String] = value match {
+ case v if v.startsWith("--") && v.contains("=") =>
+ val parts = v.split("=")
+ Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
+ case _ =>
+ None
+ }
+ SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
+ }
+
if (primaryResource != null) {
val error = s"Found two conflicting resources, $value and $primaryResource." +
" Expecting only one resource."
@@ -178,11 +276,21 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --class CLASS_NAME Name of your app's main class (required for Java apps).
| --arg ARG Argument to be passed to your application's main class. This
| option can be specified multiple times for multiple args.
- | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
| --name NAME The name of your application (Default: 'Spark').
| --jars JARS A comma-separated list of local jars to include on the
| driver classpath and that SparkContext.addJar will work
| with. Doesn't work on standalone with 'cluster' deploy mode.
+ | --files FILES Comma separated list of files to be placed in the working dir
+ | of each executor.
+ | --properties-file FILE Path to a file from which to load extra properties. If not
+ | specified, this will look for conf/spark-defaults.conf.
+ |
+ | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
+ | --driver-java-options Extra Java options to pass to the driver
+ | --driver-library-path Extra library path entries to pass to the driver
+ | --driver-class-path Extra class path entries to pass to the driver
+ |
+ | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
@@ -193,14 +301,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
|
| YARN-only:
| --executor-cores NUM Number of cores per executor (Default: 1).
- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
| --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
| --num-executors NUM Number of executors to (Default: 2).
- | --files FILES Comma separated list of files to be placed in the working dir
- | of each executor.
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working dir of each executor.""".stripMargin
)
SparkSubmit.exitFn()
}
}
+
+object SparkSubmitArguments {
+ /** Load properties present in the given file. */
+ def getPropertiesFromFile(file: File): Seq[(String, String)] = {
+ require(file.exists(), s"Properties file ${file.getName} does not exist")
+ val inputStream = new FileInputStream(file)
+ val properties = new Properties()
+ try {
+ properties.load(inputStream)
+ } catch {
+ case e: IOException =>
+ val message = s"Failed when loading Spark properties file ${file.getName}"
+ throw new SparkException(message, e)
+ }
+ properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 63f166d401..888dd45e93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -49,8 +49,8 @@ private[spark] object TestClient {
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
- "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
- Some("dummy-spark-home"), "ignored")
+ "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
+ Seq()), Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 0c761dfc93..9103c885fa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -46,21 +46,26 @@ object CommandUtils extends Logging {
* the way the JAVA_OPTS are assembled there.
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
- .map(p => List("-Djava.library.path=" + p))
- .getOrElse(Nil)
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
- .map(Utils.splitCommandString).getOrElse(Nil)
- val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
+ // Note, this will coalesce multiple options into a single command component
+ val extraOpts = command.extraJavaOptions.toSeq
+ val libraryOpts =
+ if (command.libraryPathEntries.size > 0) {
+ val joined = command.libraryPathEntries.mkString(File.pathSeparator)
+ Seq(s"-Djava.library.path=$joined")
+ } else {
+ Seq()
+ }
// Figure out our classpath with the external compute-classpath script
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
extraEnvironment=command.environment)
+ val userClassPath = command.classPathEntries.mkString(File.pathSeparator)
+ val classPathWithUser = classPath + File.pathSeparator + userClassPath
- Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+ Seq("-cp", classPathWithUser) ++ libraryOpts ++ extraOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b4df1a0dd4..f918b42c83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker
import java.io._
import scala.collection.JavaConversions._
-import scala.collection.mutable.Map
+import scala.collection.Map
import akka.actor.ActorRef
import com.google.common.base.Charsets
@@ -74,13 +74,17 @@ private[spark] class DriverRunner(
// Make sure user application jar is on the classpath
// TODO: If we add ability to submit multiple jars they should also be added here
- val env = Map(driverDesc.command.environment.toSeq: _*)
- env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename"
- val newCommand = Command(driverDesc.command.mainClass,
- driverDesc.command.arguments.map(substituteVariables), env)
+ val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename")
+ val newCommand = Command(
+ driverDesc.command.mainClass,
+ driverDesc.command.arguments.map(substituteVariables),
+ driverDesc.command.environment,
+ classPath,
+ driverDesc.command.libraryPathEntries,
+ driverDesc.command.extraJavaOptions)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
- launchDriver(command, env, driverDir, driverDesc.supervise)
+ launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
}
catch {
case e: Exception => finalException = Some(e)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2edd921066..f94cd685e8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -99,7 +99,9 @@ private[spark] class ExecutorRunner(
def getCommandSeq = {
val command = Command(appDesc.command.mainClass,
- appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment)
+ appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
+ appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
+ appDesc.command.extraJavaOptions)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f89b2bffd1..2bfb9c387e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -64,9 +64,10 @@ private[spark] class Executor(
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(
- System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))))
- {
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
conf.set("spark.local.dir", getYarnLocalDirs())
+ } else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
+ conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
}
if (!isLocal) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 936e9db805..9544ca05dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,11 +42,20 @@ private[spark] class SparkDeploySchedulerBackend(
// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- conf.get("spark.driver.host"), conf.get("spark.driver.port"),
+ conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
+ val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
+ val classPathEntries = sys.props.get("spark.executor.extraClassPath").toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+ val libraryPathEntries = sys.props.get("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+
val command = Command(
- "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
+ "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.testExecutorEnvs,
+ classPathEntries, libraryPathEntries, extraJavaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 06b041e1fd..2cd9d6c12e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -111,7 +111,18 @@ private[spark] class CoarseMesosSchedulerBackend(
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
+ val extraClassPath = conf.getOption("spark.executor.extraClassPath")
+ extraClassPath.foreach { cp =>
+ environment.addVariables(
+ Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
+ }
+ val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
+
+ val libraryPathOption = "spark.executor.extraLibraryPath"
+ val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
+ val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
+
+ sc.testExecutorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
@@ -123,20 +134,22 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format(
+ runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
- .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d")
+ .format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue,
+ offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index dfdcafe19f..c975f31232 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -90,7 +90,7 @@ private[spark] class MesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
+ sc.testExecutorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)