From f68105df52902a1c65207d4f51bfdeb55cccf767 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 31 Jul 2014 11:51:20 -0700 Subject: SPARK-2664. Deal with `--conf` options in spark-submit that relate to fl... ...ags Author: Sandy Ryza Closes #1665 from sryza/sandy-spark-2664 and squashes the following commits: 0518c63 [Sandy Ryza] SPARK-2664. Deal with `--conf` options in spark-submit that relate to flags --- .../org/apache/spark/deploy/SparkSubmit.scala | 11 +++++---- .../apache/spark/deploy/SparkSubmitArguments.scala | 26 +++++++++++++--------- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 16 +++++++++++++ 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3df811c4ac..318509a67a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -184,7 +184,7 @@ object SparkSubmit { OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"), // Yarn cluster only - OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"), + OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), @@ -268,14 +268,17 @@ object SparkSubmit { } } + // Properties given with --conf are superceded by other options, but take precedence over + // properties in the defaults file. + for ((k, v) <- args.sparkProperties) { + sysProps.getOrElseUpdate(k, v) + } + // Read from default spark properties, if any for ((k, v) <- args.getDefaultSparkProperties) { sysProps.getOrElseUpdate(k, v) } - // Spark properties included on command line take precedence - sysProps ++= args.sparkProperties - (childArgs, childClasspath, sysProps, childMainClass) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 01d0ae541a..dd044e6298 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { val sparkProperties: HashMap[String, String] = new HashMap[String, String]() parseOpts(args.toList) - loadDefaults() + mergeSparkProperties() checkRequiredArguments() /** Return default present in the currently defined defaults file. */ @@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { defaultProperties } - /** Fill in any undefined values based on the current properties file or built-in defaults. */ - private def loadDefaults(): Unit = { - + /** + * Fill in any undefined values based on the default properties file or options passed in through + * the '--conf' flag. + */ + private def mergeSparkProperties(): Unit = { // Use common defaults file, if not specified by user if (propertiesFile == null) { sys.env.get("SPARK_HOME").foreach { sparkHome => @@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } } - val defaultProperties = getDefaultSparkProperties + val properties = getDefaultSparkProperties + properties.putAll(sparkProperties) + // Use properties file as fallback for values which have a direct analog to // arguments in this script. - master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull) + master = Option(master).getOrElse(properties.get("spark.master").orNull) executorMemory = Option(executorMemory) - .getOrElse(defaultProperties.get("spark.executor.memory").orNull) + .getOrElse(properties.get("spark.executor.memory").orNull) executorCores = Option(executorCores) - .getOrElse(defaultProperties.get("spark.executor.cores").orNull) + .getOrElse(properties.get("spark.executor.cores").orNull) totalExecutorCores = Option(totalExecutorCores) - .getOrElse(defaultProperties.get("spark.cores.max").orNull) - name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull) - jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull) + .getOrElse(properties.get("spark.cores.max").orNull) + name = Option(name).getOrElse(properties.get("spark.app.name").orNull) + jars = Option(jars).getOrElse(properties.get("spark.jars").orNull) // This supports env vars in older versions of Spark master = Option(master).getOrElse(System.getenv("MASTER")) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index a301cbd48a..9190b05e2d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -253,6 +253,22 @@ class SparkSubmitSuite extends FunSuite with Matchers { sysProps("spark.shuffle.spill") should be ("false") } + test("handles confs with flag equivalents") { + val clArgs = Seq( + "--deploy-mode", "cluster", + "--executor-memory", "5g", + "--class", "org.SomeClass", + "--conf", "spark.executor.memory=4g", + "--conf", "spark.master=yarn", + "thejar.jar", + "arg1", "arg2") + val appArgs = new SparkSubmitArguments(clArgs) + val (_, _, sysProps, mainClass) = createLaunchEnv(appArgs) + sysProps("spark.executor.memory") should be ("5g") + sysProps("spark.master") should be ("yarn-cluster") + mainClass should be ("org.apache.spark.deploy.yarn.Client") + } + test("launch simple application with spark-submit") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val args = Seq( -- cgit v1.2.3