diff options
author | Stavros Kontopoulos <st.kontopoulos@gmail.com> | 2016-11-19 16:02:59 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-11-19 16:04:49 -0800 |
commit | ea77c81ec0db27ea4709f71dc080d00167505a7d (patch) | |
tree | 8ce69b260099b10cb22fa48f8ef1ad95427b3f96 /core/src/main | |
parent | ded5fefb6f5c0a97bf3d7fa1c0494dc434b6ee40 (diff) | |
download | spark-ea77c81ec0db27ea4709f71dc080d00167505a7d.tar.gz spark-ea77c81ec0db27ea4709f71dc080d00167505a7d.tar.bz2 spark-ea77c81ec0db27ea4709f71dc080d00167505a7d.zip |
[SPARK-17062][MESOS] add conf option to mesos dispatcher
Adds --conf option to set spark configuration properties in mesos dispacther.
Properties provided with --conf take precedence over properties within the properties file.
The reason for this PR is that for simple configuration or testing purposes we need to provide a property file (ideally a shared one for a cluster) even if we just provide a single property.
Manually tested.
Author: Stavros Kontopoulos <st.kontopoulos@gmail.com>
Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Closes #14650 from skonto/dipatcher_conf.
Diffstat (limited to 'core/src/main')
4 files changed, 76 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c70061bc5b..85f80b6971 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -41,12 +41,11 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver} -import org.apache.spark.{SPARK_REVISION, SPARK_VERSION, SparkException, SparkUserAppException} -import org.apache.spark.{SPARK_BRANCH, SPARK_BUILD_DATE, SPARK_BUILD_USER, SPARK_REPO_URL} +import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} +import org.apache.spark.util._ /** * Whether to submit, kill, or request the status of an application. @@ -63,7 +62,7 @@ private[deploy] object SparkSubmitAction extends Enumeration { * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */ -object SparkSubmit { +object SparkSubmit extends CommandLineUtils { // Cluster managers private val YARN = 1 @@ -87,15 +86,6 @@ object SparkSubmit { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 // scalastyle:off println - // Exposed for testing - private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode) - private[spark] var printStream: PrintStream = System.err - private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str) - private[spark] def printErrorAndExit(str: String): Unit = { - printStream.println("Error: " + str) - printStream.println("Run with --help for usage help or --verbose for debug output") - exitFn(1) - } private[spark] def printVersionAndExit(): Unit = { printStream.println("""Welcome to ____ __ @@ -115,7 +105,7 @@ object SparkSubmit { } // scalastyle:on println - def main(args: Array[String]): Unit = { + override def main(args: Array[String]): Unit = { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1761e7c1e..b1d36e1821 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -412,10 +412,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S repositories = value case CONF => - value.split("=", 2).toSeq match { - case Seq(k, v) => sparkProperties(k) = v - case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value") - } + val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value) + sparkProperties(confName) = confValue case PROXY_USER => proxyUser = value diff --git a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala new file mode 100644 index 0000000000..d73901686b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.PrintStream + +import org.apache.spark.SparkException + +/** + * Contains basic command line parsing functionality and methods to parse some common Spark CLI + * options. + */ +private[spark] trait CommandLineUtils { + + // Exposed for testing + private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode) + + private[spark] var printStream: PrintStream = System.err + + // scalastyle:off println + + private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str) + + private[spark] def printErrorAndExit(str: String): Unit = { + printStream.println("Error: " + str) + printStream.println("Run with --help for usage help or --verbose for debug output") + exitFn(1) + } + + // scalastyle:on println + + private[spark] def parseSparkConfProperty(pair: String): (String, String) = { + pair.split("=", 2).toSeq match { + case Seq(k, v) => (k, v) + case _ => printErrorAndExit(s"Spark config without '=': $pair") + throw new SparkException(s"Spark config without '=': $pair") + } + } + + def main(args: Array[String]): Unit +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 23b95b9f64..748d729554 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2056,6 +2056,20 @@ private[spark] object Utils extends Logging { path } + /** + * Updates Spark config with properties from a set of Properties. + * Provided properties have the highest priority. + */ + def updateSparkConfigFromProperties( + conf: SparkConf, + properties: Map[String, String]) : Unit = { + properties.filter { case (k, v) => + k.startsWith("spark.") + }.foreach { case (k, v) => + conf.set(k, v) + } + } + /** Load properties present in the given file. */ def getPropertiesFromFile(filename: String): Map[String, String] = { val file = new File(filename) |