From 5d991db8a84a62705efa41c9c8a215569ad648fe Mon Sep 17 00:00:00 2001 From: EugenCepoi Date: Fri, 3 Oct 2014 10:03:15 -0700 Subject: SPARK-2058: Overriding SPARK_HOME/conf with SPARK_CONF_DIR Update of PR #997. With this PR, setting SPARK_CONF_DIR overrides SPARK_HOME/conf (not only spark-defaults.conf and spark-env). Author: EugenCepoi Closes #2481 from EugenCepoi/SPARK-2058 and squashes the following commits: 0bb32c2 [EugenCepoi] use orElse orNull and fixing trailing percent in compute-classpath.cmd 77f35d7 [EugenCepoi] SPARK-2058: Overriding SPARK_HOME/conf with SPARK_CONF_DIR (cherry picked from commit f0811f928e5b608e1a2cba3b6828ba0ed03b701d) Signed-off-by: Andrew Or --- bin/compute-classpath.cmd | 8 ++++- bin/compute-classpath.sh | 8 ++++- .../apache/spark/deploy/SparkSubmitArguments.scala | 42 +++++++++------------- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 34 +++++++++++++++++- docs/configuration.md | 7 ++++ 5 files changed, 71 insertions(+), 28 deletions(-) diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd index 5ad52452a5..9b9e40321e 100644 --- a/bin/compute-classpath.cmd +++ b/bin/compute-classpath.cmd @@ -36,7 +36,13 @@ rem Load environment variables from conf\spark-env.cmd, if it exists if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" rem Build up classpath -set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf +set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH% + +if "x%SPARK_CONF_DIR%"!="x" ( + set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR% +) else ( + set CLASSPATH=%CLASSPATH%;%FWDIR%conf +) if exist "%FWDIR%RELEASE" ( for %%d in ("%FWDIR%lib\spark-assembly*.jar") do ( diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 16b794a159..4d68496e29 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -27,8 +27,14 @@ FWDIR="$(cd `dirname $0`/..; pwd)" . $FWDIR/bin/load-spark-env.sh +CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH" + # Build up classpath -CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf" +if [ -n "$SPARK_CONF_DIR" ]; then + CLASSPATH="$CLASSPATH:$SPARK_CONF_DIR" +else + CLASSPATH="$CLASSPATH:$FWDIR/conf" +fi ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION" diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 2df25546ed..5075868321 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -29,8 +29,9 @@ import org.apache.spark.util.Utils /** * Parses and encapsulates arguments from the spark-submit script. + * The env argument is used for testing. */ -private[spark] class SparkSubmitArguments(args: Seq[String]) { +private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { var master: String = null var deployMode: String = null var executorMemory: String = null @@ -90,20 +91,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { private def mergeSparkProperties(): Unit = { // Use common defaults file, if not specified by user if (propertiesFile == null) { - sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir => - val sep = File.separator - val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf" - val file = new File(defaultPath) - if (file.exists()) { - propertiesFile = file.getAbsolutePath - } - } - } + val sep = File.separator + val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf") + val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig) - if (propertiesFile == null) { - sys.env.get("SPARK_HOME").foreach { sparkHome => - val sep = File.separator - val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf" + confDir.foreach { sparkConfDir => + val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf" val file = new File(defaultPath) if (file.exists()) { propertiesFile = file.getAbsolutePath @@ -116,19 +109,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { // Use properties file as fallback for values which have a direct analog to // arguments in this script. - master = Option(master).getOrElse(properties.get("spark.master").orNull) - executorMemory = Option(executorMemory) - .getOrElse(properties.get("spark.executor.memory").orNull) - executorCores = Option(executorCores) - .getOrElse(properties.get("spark.executor.cores").orNull) + master = Option(master).orElse(properties.get("spark.master")).orNull + executorMemory = Option(executorMemory).orElse(properties.get("spark.executor.memory")).orNull + executorCores = Option(executorCores).orElse(properties.get("spark.executor.cores")).orNull totalExecutorCores = Option(totalExecutorCores) - .getOrElse(properties.get("spark.cores.max").orNull) - name = Option(name).getOrElse(properties.get("spark.app.name").orNull) - jars = Option(jars).getOrElse(properties.get("spark.jars").orNull) + .orElse(properties.get("spark.cores.max")) + .orNull + name = Option(name).orElse(properties.get("spark.app.name")).orNull + jars = Option(jars).orElse(properties.get("spark.jars")).orNull // This supports env vars in older versions of Spark - master = Option(master).getOrElse(System.getenv("MASTER")) - deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE")) + master = Option(master).orElse(env.get("MASTER")).orNull + deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && primaryResource != null) { @@ -181,7 +173,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } if (master.startsWith("yarn")) { - val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR") + val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR") if (!hasHadoopEnv && !Utils.isTesting) { throw new Exception(s"When running with master '$master' " + "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 0c324d8bdf..4cba90e8f2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.{File, OutputStream, PrintStream} +import java.io._ import scala.collection.mutable.ArrayBuffer @@ -26,6 +26,7 @@ import org.apache.spark.deploy.SparkSubmit._ import org.apache.spark.util.Utils import org.scalatest.FunSuite import org.scalatest.Matchers +import com.google.common.io.Files class SparkSubmitSuite extends FunSuite with Matchers { def beforeAll() { @@ -306,6 +307,21 @@ class SparkSubmitSuite extends FunSuite with Matchers { runSparkSubmit(args) } + test("SPARK_CONF_DIR overrides spark-defaults.conf") { + forConfDir(Map("spark.executor.memory" -> "2.3g")) { path => + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local", + unusedJar.toString) + val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> path)) + assert(appArgs.propertiesFile != null) + assert(appArgs.propertiesFile.startsWith(path)) + appArgs.executorMemory should be ("2.3g") + } + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. def runSparkSubmit(args: Seq[String]): String = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) @@ -314,6 +330,22 @@ class SparkSubmitSuite extends FunSuite with Matchers { new File(sparkHome), Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) } + + def forConfDir(defaults: Map[String, String]) (f: String => Unit) = { + val tmpDir = Files.createTempDir() + + val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf") + val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf)) + for ((key, value) <- defaults) writer.write(s"$key $value\n") + + writer.close() + + try { + f(tmpDir.getAbsolutePath) + } finally { + Utils.deleteRecursively(tmpDir) + } + } } object JarCreationTest { diff --git a/docs/configuration.md b/docs/configuration.md index 9411230b0e..c5065709f6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1079,3 +1079,10 @@ compute `SPARK_LOCAL_IP` by looking up the IP of a specific network interface. Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can configure it by adding a `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there. + +# Overriding configuration directory + +To specify a different configuration directory other than the default "SPARK_HOME/conf", +you can set SPARK_CONF_DIR. Spark will use the the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) +from this directory. + -- cgit v1.2.3