aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorStavros Kontopoulos <st.kontopoulos@gmail.com>2016-11-19 16:02:59 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-11-19 16:04:49 -0800
commitea77c81ec0db27ea4709f71dc080d00167505a7d (patch)
tree8ce69b260099b10cb22fa48f8ef1ad95427b3f96 /core/src/main/scala/org
parentded5fefb6f5c0a97bf3d7fa1c0494dc434b6ee40 (diff)
downloadspark-ea77c81ec0db27ea4709f71dc080d00167505a7d.tar.gz
spark-ea77c81ec0db27ea4709f71dc080d00167505a7d.tar.bz2
spark-ea77c81ec0db27ea4709f71dc080d00167505a7d.zip
[SPARK-17062][MESOS] add conf option to mesos dispatcher
Adds --conf option to set spark configuration properties in mesos dispacther. Properties provided with --conf take precedence over properties within the properties file. The reason for this PR is that for simple configuration or testing purposes we need to provide a property file (ideally a shared one for a cluster) even if we just provide a single property. Manually tested. Author: Stavros Kontopoulos <st.kontopoulos@gmail.com> Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com> Closes #14650 from skonto/dipatcher_conf.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala56
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala14
4 files changed, 76 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c70061bc5b..85f80b6971 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -41,12 +41,11 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}
-import org.apache.spark.{SPARK_REVISION, SPARK_VERSION, SparkException, SparkUserAppException}
-import org.apache.spark.{SPARK_BRANCH, SPARK_BUILD_DATE, SPARK_BUILD_USER, SPARK_REPO_URL}
+import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
+import org.apache.spark.util._
/**
* Whether to submit, kill, or request the status of an application.
@@ -63,7 +62,7 @@ private[deploy] object SparkSubmitAction extends Enumeration {
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
-object SparkSubmit {
+object SparkSubmit extends CommandLineUtils {
// Cluster managers
private val YARN = 1
@@ -87,15 +86,6 @@ object SparkSubmit {
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
// scalastyle:off println
- // Exposed for testing
- private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
- private[spark] var printStream: PrintStream = System.err
- private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
- private[spark] def printErrorAndExit(str: String): Unit = {
- printStream.println("Error: " + str)
- printStream.println("Run with --help for usage help or --verbose for debug output")
- exitFn(1)
- }
private[spark] def printVersionAndExit(): Unit = {
printStream.println("""Welcome to
____ __
@@ -115,7 +105,7 @@ object SparkSubmit {
}
// scalastyle:on println
- def main(args: Array[String]): Unit = {
+ override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index f1761e7c1e..b1d36e1821 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -412,10 +412,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
repositories = value
case CONF =>
- value.split("=", 2).toSeq match {
- case Seq(k, v) => sparkProperties(k) = v
- case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
- }
+ val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
+ sparkProperties(confName) = confValue
case PROXY_USER =>
proxyUser = value
diff --git a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
new file mode 100644
index 0000000000..d73901686b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.PrintStream
+
+import org.apache.spark.SparkException
+
+/**
+ * Contains basic command line parsing functionality and methods to parse some common Spark CLI
+ * options.
+ */
+private[spark] trait CommandLineUtils {
+
+ // Exposed for testing
+ private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
+
+ private[spark] var printStream: PrintStream = System.err
+
+ // scalastyle:off println
+
+ private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
+
+ private[spark] def printErrorAndExit(str: String): Unit = {
+ printStream.println("Error: " + str)
+ printStream.println("Run with --help for usage help or --verbose for debug output")
+ exitFn(1)
+ }
+
+ // scalastyle:on println
+
+ private[spark] def parseSparkConfProperty(pair: String): (String, String) = {
+ pair.split("=", 2).toSeq match {
+ case Seq(k, v) => (k, v)
+ case _ => printErrorAndExit(s"Spark config without '=': $pair")
+ throw new SparkException(s"Spark config without '=': $pair")
+ }
+ }
+
+ def main(args: Array[String]): Unit
+}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 23b95b9f64..748d729554 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2056,6 +2056,20 @@ private[spark] object Utils extends Logging {
path
}
+ /**
+ * Updates Spark config with properties from a set of Properties.
+ * Provided properties have the highest priority.
+ */
+ def updateSparkConfigFromProperties(
+ conf: SparkConf,
+ properties: Map[String, String]) : Unit = {
+ properties.filter { case (k, v) =>
+ k.startsWith("spark.")
+ }.foreach { case (k, v) =>
+ conf.set(k, v)
+ }
+ }
+
/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)