aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-04-21 10:26:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-21 10:26:33 -0700
commitfb98488fc8e68cc84f6e0750fd4e9e29029879d2 (patch)
treeeba99b56bea8ec2e357020a413bf9cf04a4e3308 /core
parent3a390bfd80f80739b9d847780eccc443fc2dc0ea (diff)
downloadspark-fb98488fc8e68cc84f6e0750fd4e9e29029879d2.tar.gz
spark-fb98488fc8e68cc84f6e0750fd4e9e29029879d2.tar.bz2
spark-fb98488fc8e68cc84f6e0750fd4e9e29029879d2.zip
Clean up and simplify Spark configuration
Over time as we've added more deployment modes, this have gotten a bit unwieldy with user-facing configuration options in Spark. Going forward we'll advise all users to run `spark-submit` to launch applications. This is a WIP patch but it makes the following improvements: 1. Improved `spark-env.sh.template` which was missing a lot of things users now set in that file. 2. Removes the shipping of SPARK_CLASSPATH, SPARK_JAVA_OPTS, and SPARK_LIBRARY_PATH to the executors on the cluster. This was an ugly hack. Instead it introduces config variables spark.executor.extraJavaOpts, spark.executor.extraLibraryPath, and spark.executor.extraClassPath. 3. Adds ability to set these same variables for the driver using `spark-submit`. 4. Allows you to load system properties from a `spark-defaults.conf` file when running `spark-submit`. This will allow setting both SparkConf options and other system properties utilized by `spark-submit`. 5. Made `SPARK_LOCAL_IP` an environment variable rather than a SparkConf property. This is more consistent with it being set on each node. Author: Patrick Wendell <pwendell@gmail.com> Closes #299 from pwendell/config-cleanup and squashes the following commits: 127f301 [Patrick Wendell] Improvements to testing a006464 [Patrick Wendell] Moving properties file template. b4b496c [Patrick Wendell] spark-defaults.properties -> spark-defaults.conf 0086939 [Patrick Wendell] Minor style fixes af09e3e [Patrick Wendell] Mention config file in docs and clean-up docs b16e6a2 [Patrick Wendell] Cleanup of spark-submit script and Scala quick start guide af0adf7 [Patrick Wendell] Automatically add user jar a56b125 [Patrick Wendell] Responses to Tom's review d50c388 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into config-cleanup a762901 [Patrick Wendell] Fixing test failures ffa00fe [Patrick Wendell] Review feedback fda0301 [Patrick Wendell] Note 308f1f6 [Patrick Wendell] Properly escape quotes and other clean-up for YARN e83cd8f [Patrick Wendell] Changes to allow re-use of test applications be42f35 [Patrick Wendell] Handle case where SPARK_HOME is not set c2a2909 [Patrick Wendell] Test compile fixes 4ee6f9d [Patrick Wendell] Making YARN doc changes consistent afc9ed8 [Patrick Wendell] Cleaning up line limits and two compile errors. b08893b [Patrick Wendell] Additional improvements. ace4ead [Patrick Wendell] Responses to review feedback. b72d183 [Patrick Wendell] Review feedback for spark env file 46555c1 [Patrick Wendell] Review feedback and import clean-ups 437aed1 [Patrick Wendell] Small fix 761ebcd [Patrick Wendell] Library path and classpath for drivers 7cc70e4 [Patrick Wendell] Clean up terminology inside of spark-env script 5b0ba8e [Patrick Wendell] Don't ship executor envs 84cc5e5 [Patrick Wendell] Small clean-up 1f75238 [Patrick Wendell] SPARK_JAVA_OPTS --> SPARK_MASTER_OPTS for master settings 4982331 [Patrick Wendell] Remove SPARK_LIBRARY_PATH 6eaf7d0 [Patrick Wendell] executorJavaOpts 0faa3b6 [Patrick Wendell] Stash of adding config options in submit script and YARN ac2d65e [Patrick Wendell] Change spark.local.dir -> SPARK_LOCAL_DIRS
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala76
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Command.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala190
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala111
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala3
18 files changed, 479 insertions, 114 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b947feb891..bd21fdc5a1 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -208,6 +208,82 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
new SparkConf(false).setAll(settings)
}
+ /** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
+ * idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
+ private[spark] def validateSettings() {
+ if (settings.contains("spark.local.dir")) {
+ val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +
+ "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."
+ logWarning(msg)
+ }
+
+ val executorOptsKey = "spark.executor.extraJavaOptions"
+ val executorClasspathKey = "spark.executor.extraClassPath"
+ val driverOptsKey = "spark.driver.extraJavaOptions"
+ val driverClassPathKey = "spark.driver.extraClassPath"
+
+ // Validate spark.executor.extraJavaOptions
+ settings.get(executorOptsKey).map { javaOpts =>
+ if (javaOpts.contains("-Dspark")) {
+ val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts)'. " +
+ "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
+ throw new Exception(msg)
+ }
+ if (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")) {
+ val msg = s"$executorOptsKey is not allowed to alter memory settings (was '$javaOpts'). " +
+ "Use spark.executor.memory instead."
+ throw new Exception(msg)
+ }
+ }
+
+ // Check for legacy configs
+ sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
+ val error =
+ s"""
+ |SPARK_JAVA_OPTS was detected (set to '$value').
+ |This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
+ |
+ |Please instead use:
+ | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
+ | - ./spark-submit with --driver-java-options to set -X options for a driver
+ | - spark.executor.extraJavaOptions to set -X options for executors
+ | - SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master, worker)
+ """.stripMargin
+ logError(error)
+
+ for (key <- Seq(executorOptsKey, driverOptsKey)) {
+ if (getOption(key).isDefined) {
+ throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
+ } else {
+ logWarning(s"Setting '$key' to '$value' as a work-around.")
+ set(key, value)
+ }
+ }
+ }
+
+ sys.env.get("SPARK_CLASSPATH").foreach { value =>
+ val error =
+ s"""
+ |SPARK_CLASSPATH was detected (set to '$value').
+ | This has undefined behavior when running on a cluster and is deprecated in Spark 1.0+.
+ |
+ |Please instead use:
+ | - ./spark-submit with --driver-class-path to augment the driver classpath
+ | - spark.executor.extraClassPath to augment the executor classpath
+ """.stripMargin
+ logError(error)
+
+ for (key <- Seq(executorClasspathKey, driverClassPathKey)) {
+ if (getOption(key).isDefined) {
+ throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.")
+ } else {
+ logWarning(s"Setting '$key' to '$value' as a work-around.")
+ set(key, value)
+ }
+ }
+ }
+ }
+
/**
* Return a string listing all keys and values, one per line. This is useful to print the
* configuration out for debugging.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d3ef75bc73..7933d68d67 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -148,6 +148,7 @@ class SparkContext(config: SparkConf) extends Logging {
this(master, appName, sparkHome, jars, Map(), Map())
private[spark] val conf = config.clone()
+ conf.validateSettings()
/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
@@ -159,7 +160,7 @@ class SparkContext(config: SparkConf) extends Logging {
throw new SparkException("A master URL must be set in your configuration")
}
if (!conf.contains("spark.app.name")) {
- throw new SparkException("An application must be set in your configuration")
+ throw new SparkException("An application name must be set in your configuration")
}
if (conf.getBoolean("spark.logConf", false)) {
@@ -170,11 +171,11 @@ class SparkContext(config: SparkConf) extends Logging {
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")
- val jars: Seq[String] = if (conf.contains("spark.jars")) {
- conf.get("spark.jars").split(",").filter(_.size != 0)
- } else {
- null
- }
+ val jars: Seq[String] =
+ conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
+
+ val files: Seq[String] =
+ conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
@@ -235,6 +236,10 @@ class SparkContext(config: SparkConf) extends Logging {
jars.foreach(addJar)
}
+ if (files != null) {
+ files.foreach(addFile)
+ }
+
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
@@ -247,22 +252,20 @@ class SparkContext(config: SparkConf) extends Logging {
.map(Utils.memoryStringToMb)
.getOrElse(512)
- // Environment variables to pass to our executors
- private[spark] val executorEnvs = HashMap[String, String]()
- for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS");
- value <- Option(System.getenv(key))) {
- executorEnvs(key) = value
- }
+ // Environment variables to pass to our executors.
+ // NOTE: This should only be used for test related settings.
+ private[spark] val testExecutorEnvs = HashMap[String, String]()
+
// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
- for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
+ for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
- executorEnvs(envKey) = value
+ testExecutorEnvs(envKey) = value
}
// The Mesos scheduler backend relies on this environment variable to set executor memory.
// TODO: Set this only in the Mesos scheduler.
- executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
- executorEnvs ++= conf.getExecutorEnv
+ testExecutorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
+ testExecutorEnvs ++= conf.getExecutorEnv
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
@@ -270,7 +273,7 @@ class SparkContext(config: SparkConf) extends Logging {
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
- executorEnvs("SPARK_USER") = sparkUser
+ testExecutorEnvs("SPARK_USER") = sparkUser
// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 8fd2c7e95b..7ead117152 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -54,8 +54,21 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
System.getenv().foreach{case (k, v) => env(k) = v}
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
+
+ val classPathConf = "spark.driver.extraClassPath"
+ val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+
+ val libraryPathConf = "spark.driver.extraLibraryPath"
+ val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+
+ val javaOptionsConf = "spark.driver.extraJavaOptions"
+ val javaOpts = sys.props.get(javaOptionsConf)
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
- driverArgs.driverOptions, env)
+ driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/Command.scala b/core/src/main/scala/org/apache/spark/deploy/Command.scala
index fa8af9a646..32f3ba3850 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Command.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Command.scala
@@ -22,5 +22,8 @@ import scala.collection.Map
private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
- environment: Map[String, String]) {
+ environment: Map[String, String],
+ classPathEntries: Seq[String],
+ libraryPathEntries: Seq[String],
+ extraJavaOptions: Option[String] = None) {
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e5d593cade..1b1e0fce0e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -17,14 +17,12 @@
package org.apache.spark.deploy
-import java.io.{PrintStream, File}
+import java.io.{File, PrintStream}
import java.net.{URI, URL}
-import org.apache.spark.executor.ExecutorURLClassLoader
+import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Map
+import org.apache.spark.executor.ExecutorURLClassLoader
/**
* Scala code behind the spark-submit script. The script handles setting up the classpath with
@@ -63,7 +61,8 @@ object SparkSubmit {
/**
* @return
* a tuple containing the arguments for the child, a list of classpath
- * entries for the child, and the main class for the child
+ * entries for the child, a list of system propertes, a list of env vars
+ * and the main class for the child
*/
private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
@@ -123,6 +122,12 @@ object SparkSubmit {
val options = List[OptionAssigner](
new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
+ new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true,
+ sysProp = "spark.driver.extraClassPath"),
+ new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true,
+ sysProp = "spark.driver.extraJavaOptions"),
+ new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true,
+ sysProp = "spark.driver.extraLibraryPath"),
new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"),
new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"),
new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"),
@@ -142,10 +147,14 @@ object SparkSubmit {
new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"),
new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"),
- new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars")
+ new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"),
+ new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
+ new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"),
+ new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false,
+ sysProp = "spark.app.name")
)
- // more jars
+ // For client mode make any added jars immediately visible on the classpath
if (appArgs.jars != null && !deployOnCluster) {
for (jar <- appArgs.jars.split(",")) {
childClasspath += jar
@@ -163,6 +172,14 @@ object SparkSubmit {
}
}
+ // For standalone mode, add the application jar automatically so the user doesn't have to
+ // call sc.addJar. TODO: Standalone mode in the cluster
+ if (clusterManager == STANDALONE) {
+ val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
+ sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(","))
+ println("SPARK JARS" + sysProps.get("spark.jars"))
+ }
+
if (deployOnCluster && clusterManager == STANDALONE) {
if (appArgs.supervise) {
childArgs += "--supervise"
@@ -173,7 +190,7 @@ object SparkSubmit {
childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
}
- // args
+ // Arguments to be passed to user program
if (appArgs.childArgs != null) {
if (!deployOnCluster || clusterManager == STANDALONE) {
childArgs ++= appArgs.childArgs
@@ -184,6 +201,10 @@ object SparkSubmit {
}
}
+ for ((k, v) <- appArgs.getDefaultSparkProperties) {
+ if (!sysProps.contains(k)) sysProps(k) = v
+ }
+
(childArgs, childClasspath, sysProps, childMainClass)
}
@@ -191,11 +212,11 @@ object SparkSubmit {
sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) {
if (verbose) {
- System.err.println(s"Main class:\n$childMainClass")
- System.err.println(s"Arguments:\n${childArgs.mkString("\n")}")
- System.err.println(s"System properties:\n${sysProps.mkString("\n")}")
- System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
- System.err.println("\n")
+ printStream.println(s"Main class:\n$childMainClass")
+ printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
+ printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
+ printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
+ printStream.println("\n")
}
val loader = new ExecutorURLClassLoader(new Array[URL](0),
@@ -226,6 +247,10 @@ object SparkSubmit {
}
}
+/**
+ * Provides an indirection layer for passing arguments as system properties or flags to
+ * the user's driver program or to downstream launcher tools.
+ */
private[spark] class OptionAssigner(val value: String,
val clusterManager: Int,
val deployOnCluster: Boolean,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 834b3df2f1..02502adfbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -17,18 +17,28 @@
package org.apache.spark.deploy
-import scala.collection.mutable.ArrayBuffer
+import java.io.{File, FileInputStream, IOException}
+import java.util.Properties
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ArrayBuffer}
+
+import org.apache.spark.SparkException
/**
* Parses and encapsulates arguments from the spark-submit script.
*/
private[spark] class SparkSubmitArguments(args: Array[String]) {
- var master: String = "local"
+ var master: String = null
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
var totalExecutorCores: String = null
+ var propertiesFile: String = null
var driverMemory: String = null
+ var driverExtraClassPath: String = null
+ var driverExtraLibraryPath: String = null
+ var driverExtraJavaOptions: String = null
var driverCores: String = null
var supervise: Boolean = false
var queue: String = null
@@ -42,42 +52,102 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
var jars: String = null
var verbose: Boolean = false
- loadEnvVars()
parseOpts(args.toList)
+ loadDefaults()
+ checkRequiredArguments()
+
+ /** Return default present in the currently defined defaults file. */
+ def getDefaultSparkProperties = {
+ val defaultProperties = new HashMap[String, String]()
+ if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
+ Option(propertiesFile).foreach { filename =>
+ val file = new File(filename)
+ SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
+ if (k.startsWith("spark")) {
+ defaultProperties(k) = v
+ if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
+ }
+ else {
+ SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
+ }
+ }
+ }
+ defaultProperties
+ }
+
+ /** Fill in any undefined values based on the current properties file or built-in defaults. */
+ private def loadDefaults() = {
+
+ // Use common defaults file, if not specified by user
+ if (propertiesFile == null) {
+ sys.env.get("SPARK_HOME").foreach { sparkHome =>
+ val sep = File.separator
+ val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf"
+ val file = new File(defaultPath)
+ if (file.exists()) {
+ propertiesFile = file.getAbsolutePath
+ }
+ }
+ }
+
+ val defaultProperties = getDefaultSparkProperties
+ // Use properties file as fallback for values which have a direct analog to
+ // arguments in this script.
+ master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
+ executorMemory = Option(executorMemory)
+ .getOrElse(defaultProperties.get("spark.executor.memory").orNull)
+ executorCores = Option(executorCores)
+ .getOrElse(defaultProperties.get("spark.executor.cores").orNull)
+ totalExecutorCores = Option(totalExecutorCores)
+ .getOrElse(defaultProperties.get("spark.cores.max").orNull)
+ name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
+ jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
- // Sanity checks
- if (args.length == 0) printUsageAndExit(-1)
- if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
- if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+ // This supports env vars in older versions of Spark
+ master = Option(master).getOrElse(System.getenv("MASTER"))
+ deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
+
+ // Global defaults. These should be keep to minimum to avoid confusing behavior.
+ master = Option(master).getOrElse("local")
+ }
+
+ /** Ensure that required fields exists. Call this only once all defaults are loaded. */
+ private def checkRequiredArguments() = {
+ if (args.length == 0) printUsageAndExit(-1)
+ if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
+ if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
+ }
override def toString = {
s"""Parsed arguments:
- | master $master
- | deployMode $deployMode
- | executorMemory $executorMemory
- | executorCores $executorCores
- | totalExecutorCores $totalExecutorCores
- | driverMemory $driverMemory
- | drivercores $driverCores
- | supervise $supervise
- | queue $queue
- | numExecutors $numExecutors
- | files $files
- | archives $archives
- | mainClass $mainClass
- | primaryResource $primaryResource
- | name $name
- | childArgs [${childArgs.mkString(" ")}]
- | jars $jars
- | verbose $verbose
+ | master $master
+ | deployMode $deployMode
+ | executorMemory $executorMemory
+ | executorCores $executorCores
+ | totalExecutorCores $totalExecutorCores
+ | propertiesFile $propertiesFile
+ | driverMemory $driverMemory
+ | driverCores $driverCores
+ | driverExtraClassPath $driverExtraClassPath
+ | driverExtraLibraryPath $driverExtraLibraryPath
+ | driverExtraJavaOptions $driverExtraJavaOptions
+ | supervise $supervise
+ | queue $queue
+ | numExecutors $numExecutors
+ | files $files
+ | archives $archives
+ | mainClass $mainClass
+ | primaryResource $primaryResource
+ | name $name
+ | childArgs [${childArgs.mkString(" ")}]
+ | jars $jars
+ | verbose $verbose
+ |
+ |Default properties from $propertiesFile:
+ |${getDefaultSparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}
- private def loadEnvVars() {
- Option(System.getenv("MASTER")).map(master = _)
- Option(System.getenv("DEPLOY_MODE")).map(deployMode = _)
- }
-
private def parseOpts(opts: List[String]): Unit = opts match {
case ("--name") :: value :: tail =>
name = value
@@ -122,6 +192,22 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
driverCores = value
parseOpts(tail)
+ case ("--driver-class-path") :: value :: tail =>
+ driverExtraClassPath = value
+ parseOpts(tail)
+
+ case ("--driver-java-options") :: value :: tail =>
+ driverExtraJavaOptions = value
+ parseOpts(tail)
+
+ case ("--driver-library-path") :: value :: tail =>
+ driverExtraLibraryPath = value
+ parseOpts(tail)
+
+ case ("--properties-file") :: value :: tail =>
+ propertiesFile = value
+ parseOpts(tail)
+
case ("--supervise") :: tail =>
supervise = true
parseOpts(tail)
@@ -154,6 +240,18 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
parseOpts(tail)
case value :: tail =>
+ if (value.startsWith("-")) {
+ val errMessage = s"Unrecognized option '$value'."
+ val suggestion: Option[String] = value match {
+ case v if v.startsWith("--") && v.contains("=") =>
+ val parts = v.split("=")
+ Some(s"Perhaps you want '${parts(0)} ${parts(1)}'?")
+ case _ =>
+ None
+ }
+ SparkSubmit.printErrorAndExit(errMessage + suggestion.map(" " + _).getOrElse(""))
+ }
+
if (primaryResource != null) {
val error = s"Found two conflicting resources, $value and $primaryResource." +
" Expecting only one resource."
@@ -178,11 +276,21 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
| --class CLASS_NAME Name of your app's main class (required for Java apps).
| --arg ARG Argument to be passed to your application's main class. This
| option can be specified multiple times for multiple args.
- | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
| --name NAME The name of your application (Default: 'Spark').
| --jars JARS A comma-separated list of local jars to include on the
| driver classpath and that SparkContext.addJar will work
| with. Doesn't work on standalone with 'cluster' deploy mode.
+ | --files FILES Comma separated list of files to be placed in the working dir
+ | of each executor.
+ | --properties-file FILE Path to a file from which to load extra properties. If not
+ | specified, this will look for conf/spark-defaults.conf.
+ |
+ | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M).
+ | --driver-java-options Extra Java options to pass to the driver
+ | --driver-library-path Extra library path entries to pass to the driver
+ | --driver-class-path Extra class path entries to pass to the driver
+ |
+ | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
@@ -193,14 +301,28 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
|
| YARN-only:
| --executor-cores NUM Number of cores per executor (Default: 1).
- | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
| --queue QUEUE_NAME The YARN queue to submit to (Default: 'default').
| --num-executors NUM Number of executors to (Default: 2).
- | --files FILES Comma separated list of files to be placed in the working dir
- | of each executor.
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working dir of each executor.""".stripMargin
)
SparkSubmit.exitFn()
}
}
+
+object SparkSubmitArguments {
+ /** Load properties present in the given file. */
+ def getPropertiesFromFile(file: File): Seq[(String, String)] = {
+ require(file.exists(), s"Properties file ${file.getName} does not exist")
+ val inputStream = new FileInputStream(file)
+ val properties = new Properties()
+ try {
+ properties.load(inputStream)
+ } catch {
+ case e: IOException =>
+ val message = s"Failed when loading Spark properties file ${file.getName}"
+ throw new SparkException(message, e)
+ }
+ properties.stringPropertyNames().toSeq.map(k => (k, properties(k)))
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 63f166d401..888dd45e93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -49,8 +49,8 @@ private[spark] object TestClient {
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
- "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
- Some("dummy-spark-home"), "ignored")
+ "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
+ Seq()), Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 0c761dfc93..9103c885fa 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -46,21 +46,26 @@ object CommandUtils extends Logging {
* the way the JAVA_OPTS are assembled there.
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
- val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command)
- .map(p => List("-Djava.library.path=" + p))
- .getOrElse(Nil)
- val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
- .map(Utils.splitCommandString).getOrElse(Nil)
- val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
+ // Note, this will coalesce multiple options into a single command component
+ val extraOpts = command.extraJavaOptions.toSeq
+ val libraryOpts =
+ if (command.libraryPathEntries.size > 0) {
+ val joined = command.libraryPathEntries.mkString(File.pathSeparator)
+ Seq(s"-Djava.library.path=$joined")
+ } else {
+ Seq()
+ }
// Figure out our classpath with the external compute-classpath script
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
extraEnvironment=command.environment)
+ val userClassPath = command.classPathEntries.mkString(File.pathSeparator)
+ val classPathWithUser = classPath + File.pathSeparator + userClassPath
- Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts
+ Seq("-cp", classPathWithUser) ++ libraryOpts ++ extraOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b4df1a0dd4..f918b42c83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.worker
import java.io._
import scala.collection.JavaConversions._
-import scala.collection.mutable.Map
+import scala.collection.Map
import akka.actor.ActorRef
import com.google.common.base.Charsets
@@ -74,13 +74,17 @@ private[spark] class DriverRunner(
// Make sure user application jar is on the classpath
// TODO: If we add ability to submit multiple jars they should also be added here
- val env = Map(driverDesc.command.environment.toSeq: _*)
- env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename"
- val newCommand = Command(driverDesc.command.mainClass,
- driverDesc.command.arguments.map(substituteVariables), env)
+ val classPath = driverDesc.command.classPathEntries ++ Seq(s"$localJarFilename")
+ val newCommand = Command(
+ driverDesc.command.mainClass,
+ driverDesc.command.arguments.map(substituteVariables),
+ driverDesc.command.environment,
+ classPath,
+ driverDesc.command.libraryPathEntries,
+ driverDesc.command.extraJavaOptions)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
- launchDriver(command, env, driverDir, driverDesc.supervise)
+ launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
}
catch {
case e: Exception => finalException = Some(e)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2edd921066..f94cd685e8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -99,7 +99,9 @@ private[spark] class ExecutorRunner(
def getCommandSeq = {
val command = Command(appDesc.command.mainClass,
- appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment)
+ appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
+ appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
+ appDesc.command.extraJavaOptions)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f89b2bffd1..2bfb9c387e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -64,9 +64,10 @@ private[spark] class Executor(
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(
- System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))))
- {
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
conf.set("spark.local.dir", getYarnLocalDirs())
+ } else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
+ conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
}
if (!isLocal) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 936e9db805..9544ca05dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,11 +42,20 @@ private[spark] class SparkDeploySchedulerBackend(
// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
- conf.get("spark.driver.host"), conf.get("spark.driver.port"),
+ conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
+ val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
+ val classPathEntries = sys.props.get("spark.executor.extraClassPath").toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+ val libraryPathEntries = sys.props.get("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
+ cp.split(java.io.File.pathSeparator)
+ }
+
val command = Command(
- "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
+ "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.testExecutorEnvs,
+ classPathEntries, libraryPathEntries, extraJavaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 06b041e1fd..2cd9d6c12e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -111,7 +111,18 @@ private[spark] class CoarseMesosSchedulerBackend(
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
+ val extraClassPath = conf.getOption("spark.executor.extraClassPath")
+ extraClassPath.foreach { cp =>
+ environment.addVariables(
+ Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
+ }
+ val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
+
+ val libraryPathOption = "spark.executor.extraLibraryPath"
+ val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
+ val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
+
+ sc.testExecutorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
@@ -123,20 +134,22 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
- "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format(
+ runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
- "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
- .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d")
+ .format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue,
+ offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index dfdcafe19f..c975f31232 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -90,7 +90,7 @@ private[spark] class MesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
+ sc.testExecutorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 9f2924c23b..bfae32dae0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -87,7 +87,7 @@ class JsonProtocolSuite extends FunSuite {
}
def createAppDesc(): ApplicationDescription = {
- val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
+ val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
}
@@ -100,7 +100,7 @@ class JsonProtocolSuite extends FunSuite {
def createDriverCommand() = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
- Map(("K1", "V1"), ("K2", "V2"))
+ Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
)
def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
@@ -133,9 +133,12 @@ class JsonProtocolSuite extends FunSuite {
def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
val Diff(c, a, d) = validateJson diff expectedJson
- assert(c === JNothing, "Json changed")
- assert(a === JNothing, "Json added")
- assert(d === JNothing, "Json deleted")
+ val validatePretty = JsonMethods.pretty(validateJson)
+ val expectedPretty = JsonMethods.pretty(expectedJson)
+ val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
+ assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
+ assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
+ assert(d === JNothing, s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}")
}
}
@@ -165,7 +168,7 @@ object JsonConstants {
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
- |"command":"Command(mainClass,List(arg1, arg2),Map())"}
+ |"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin
val executorRunnerJsonStr =
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4e489cd9b6..f82d717719 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -17,16 +17,16 @@
package org.apache.spark.deploy
-import java.io.{OutputStream, PrintStream}
+import java.io.{File, OutputStream, PrintStream}
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException, TestUtils}
+import org.apache.spark.deploy.SparkSubmit._
+import org.apache.spark.util.Utils
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.apache.spark.deploy.SparkSubmit._
-
-
class SparkSubmitSuite extends FunSuite with ShouldMatchers {
val noOpOutputStream = new OutputStream {
@@ -42,7 +42,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
/** Returns true if the script exits and the given search string is printed. */
- def testPrematureExit(input: Array[String], searchString: String): Boolean = {
+ def testPrematureExit(input: Array[String], searchString: String) = {
val printStream = new BufferPrintStream()
SparkSubmit.printStream = printStream
@@ -60,28 +60,38 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
}
thread.start()
thread.join()
- printStream.lineBuffer.find(s => s.contains(searchString)).size > 0
+ val joined = printStream.lineBuffer.mkString("\n")
+ if (!joined.contains(searchString)) {
+ fail(s"Search string '$searchString' not found in $joined")
+ }
}
test("prints usage on empty input") {
- testPrematureExit(Array[String](), "Usage: spark-submit") should be (true)
+ testPrematureExit(Array[String](), "Usage: spark-submit")
}
test("prints usage with only --help") {
- testPrematureExit(Array("--help"), "Usage: spark-submit") should be (true)
+ testPrematureExit(Array("--help"), "Usage: spark-submit")
+ }
+
+ test("prints error with unrecognized option") {
+ testPrematureExit(Array("--blarg"), "Unrecognized option '--blarg'")
+ testPrematureExit(Array("-bleg"), "Unrecognized option '-bleg'")
+ testPrematureExit(Array("--master=abc"),
+ "Unrecognized option '--master=abc'. Perhaps you want '--master abc'?")
}
test("handles multiple binary definitions") {
val adjacentJars = Array("foo.jar", "bar.jar")
- testPrematureExit(adjacentJars, "error: Found two conflicting resources") should be (true)
+ testPrematureExit(adjacentJars, "error: Found two conflicting resources")
val nonAdjacentJars =
Array("foo.jar", "--master", "123", "--class", "abc", "bar.jar")
- testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources") should be (true)
+ testPrematureExit(nonAdjacentJars, "error: Found two conflicting resources")
}
test("handle binary specified but not class") {
- testPrematureExit(Array("foo.jar"), "must specify a main class")
+ testPrematureExit(Array("foo.jar"), "Must specify a main class")
}
test("handles YARN cluster mode") {
@@ -140,12 +150,11 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
val childArgsStr = childArgs.mkString(" ")
- print("child args: " + childArgsStr)
childArgsStr.startsWith("--memory 4g --cores 5 --supervise") should be (true)
childArgsStr should include ("launch spark://h:p thejar.jar org.SomeClass arg1 arg2")
mainClass should be ("org.apache.spark.deploy.Client")
classpath should have length (0)
- sysProps should have size (0)
+ sysProps should have size (1) // contains --jar entry
}
test("handles standalone client mode") {
@@ -175,4 +184,80 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers {
sysProps("spark.executor.memory") should be ("5g")
sysProps("spark.cores.max") should be ("5")
}
+
+ test("launch simple application with spark-submit") {
+ runSparkSubmit(
+ Seq("unUsed.jar",
+ "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local"))
+ }
+
+ test("spark submit includes jars passed in through --jar") {
+ val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA"))
+ val jar2 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassB"))
+ val jarsString = Seq(jar1, jar2).map(j => j.toString).mkString(",")
+ runSparkSubmit(
+ Seq("unUsed.jar",
+ "--class", JarCreationTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local-cluster[2,1,512]",
+ "--jars", jarsString))
+ }
+
+ // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
+ def runSparkSubmit(args: Seq[String]): String = {
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+ Utils.executeAndGetOutput(
+ Seq("./bin/spark-submit") ++ args,
+ new File(sparkHome),
+ Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+ }
+}
+
+object JarCreationTest {
+ def main(args: Array[String]) {
+ val conf = new SparkConf()
+ val sc = new SparkContext(conf)
+ val result = sc.makeRDD(1 to 100, 10).mapPartitions{ x =>
+ var foundClasses = false
+ try {
+ Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
+ Class.forName("SparkSubmitClassA", true, Thread.currentThread().getContextClassLoader)
+ foundClasses = true
+ } catch {
+ case _: Throwable => // catch all
+ }
+ Seq(foundClasses).iterator
+ }.collect()
+ if (result.contains(false)) {
+ throw new Exception("Could not load user defined classes inside of executors")
+ }
+ }
+}
+
+object SimpleApplicationTest {
+ def main(args: Array[String]) {
+ val conf = new SparkConf()
+ val sc = new SparkContext(conf)
+
+ val configs = Seq("spark.master", "spark.app.name")
+ for (config <- configs) {
+ val masterValue = conf.get(config)
+ val executorValues = sc
+ .makeRDD(1 to 100, 10)
+ .map(x => SparkEnv.get.conf.get(config))
+ .collect()
+ .distinct
+ if (executorValues.size != 1) {
+ throw new SparkException(s"Inconsistent values for $config: $executorValues")
+ }
+ val executorValue = executorValues(0)
+ if (executorValue != masterValue) {
+ throw new SparkException(
+ s"Master had $config=$masterValue but executor had $config=$executorValue")
+ }
+ }
+
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index a2c131b0c9..4633bc3f7f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}
class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
- val command = new Command("mainClass", Seq(), Map())
+ val command = new Command("mainClass", Seq(), Map(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
null, "akka://1.2.3.4/worker/")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 3cab8e7b37..8ae387fa0b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -27,7 +27,8 @@ class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
- val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
+ val appDesc = new ApplicationDescription("app name", Some(8), 500,
+ Command("foo", Seq(), Map(), Seq(), Seq()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),