aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala16
3 files changed, 38 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 3df811c4ac..318509a67a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -184,7 +184,7 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
// Yarn cluster only
- OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
+ OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
@@ -268,14 +268,17 @@ object SparkSubmit {
}
}
+ // Properties given with --conf are superceded by other options, but take precedence over
+ // properties in the defaults file.
+ for ((k, v) <- args.sparkProperties) {
+ sysProps.getOrElseUpdate(k, v)
+ }
+
// Read from default spark properties, if any
for ((k, v) <- args.getDefaultSparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
- // Spark properties included on command line take precedence
- sysProps ++= args.sparkProperties
-
(childArgs, childClasspath, sysProps, childMainClass)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 01d0ae541a..dd044e6298 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
parseOpts(args.toList)
- loadDefaults()
+ mergeSparkProperties()
checkRequiredArguments()
/** Return default present in the currently defined defaults file. */
@@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
defaultProperties
}
- /** Fill in any undefined values based on the current properties file or built-in defaults. */
- private def loadDefaults(): Unit = {
-
+ /**
+ * Fill in any undefined values based on the default properties file or options passed in through
+ * the '--conf' flag.
+ */
+ private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
sys.env.get("SPARK_HOME").foreach { sparkHome =>
@@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
}
- val defaultProperties = getDefaultSparkProperties
+ val properties = getDefaultSparkProperties
+ properties.putAll(sparkProperties)
+
// Use properties file as fallback for values which have a direct analog to
// arguments in this script.
- master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
+ master = Option(master).getOrElse(properties.get("spark.master").orNull)
executorMemory = Option(executorMemory)
- .getOrElse(defaultProperties.get("spark.executor.memory").orNull)
+ .getOrElse(properties.get("spark.executor.memory").orNull)
executorCores = Option(executorCores)
- .getOrElse(defaultProperties.get("spark.executor.cores").orNull)
+ .getOrElse(properties.get("spark.executor.cores").orNull)
totalExecutorCores = Option(totalExecutorCores)
- .getOrElse(defaultProperties.get("spark.cores.max").orNull)
- name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
- jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
+ .getOrElse(properties.get("spark.cores.max").orNull)
+ name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
+ jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
// This supports env vars in older versions of Spark
master = Option(master).getOrElse(System.getenv("MASTER"))
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index a301cbd48a..9190b05e2d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -253,6 +253,22 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.shuffle.spill") should be ("false")
}
+ test("handles confs with flag equivalents") {
+ val clArgs = Seq(
+ "--deploy-mode", "cluster",
+ "--executor-memory", "5g",
+ "--class", "org.SomeClass",
+ "--conf", "spark.executor.memory=4g",
+ "--conf", "spark.master=yarn",
+ "thejar.jar",
+ "arg1", "arg2")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs)
+ sysProps("spark.executor.memory") should be ("5g")
+ sysProps("spark.master") should be ("yarn-cluster")
+ mainClass should be ("org.apache.spark.deploy.yarn.Client")
+ }
+
test("launch simple application with spark-submit") {
val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(