aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorEugenCepoi <cepoi.eugen@gmail.com>2014-10-03 10:03:15 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-03 10:03:15 -0700
commitf0811f928e5b608e1a2cba3b6828ba0ed03b701d (patch)
tree97a17a42d3825dcadf332a4d510d24deb973d00c /core
parent2e4eae3a52e3d04895b00447d1ac56ae3c1b98ae (diff)
downloadspark-f0811f928e5b608e1a2cba3b6828ba0ed03b701d.tar.gz
spark-f0811f928e5b608e1a2cba3b6828ba0ed03b701d.tar.bz2
spark-f0811f928e5b608e1a2cba3b6828ba0ed03b701d.zip
SPARK-2058: Overriding SPARK_HOME/conf with SPARK_CONF_DIR
Update of PR #997. With this PR, setting SPARK_CONF_DIR overrides SPARK_HOME/conf (not only spark-defaults.conf and spark-env). Author: EugenCepoi <cepoi.eugen@gmail.com> Closes #2481 from EugenCepoi/SPARK-2058 and squashes the following commits: 0bb32c2 [EugenCepoi] use orElse orNull and fixing trailing percent in compute-classpath.cmd 77f35d7 [EugenCepoi] SPARK-2058: Overriding SPARK_HOME/conf with SPARK_CONF_DIR
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala42
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala34
2 files changed, 50 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 2b72c61cc8..57b251ff47 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -29,8 +29,9 @@ import org.apache.spark.util.Utils
/**
* Parses and encapsulates arguments from the spark-submit script.
+ * The env argument is used for testing.
*/
-private[spark] class SparkSubmitArguments(args: Seq[String]) {
+private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
@@ -90,20 +91,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
- sys.env.get("SPARK_CONF_DIR").foreach { sparkConfDir =>
- val sep = File.separator
- val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
- val file = new File(defaultPath)
- if (file.exists()) {
- propertiesFile = file.getAbsolutePath
- }
- }
- }
+ val sep = File.separator
+ val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
+ val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
- if (propertiesFile == null) {
- sys.env.get("SPARK_HOME").foreach { sparkHome =>
- val sep = File.separator
- val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf"
+ confDir.foreach { sparkConfDir =>
+ val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
@@ -117,19 +110,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
// Use properties file as fallback for values which have a direct analog to
// arguments in this script.
- master = Option(master).getOrElse(properties.get("spark.master").orNull)
- executorMemory = Option(executorMemory)
- .getOrElse(properties.get("spark.executor.memory").orNull)
- executorCores = Option(executorCores)
- .getOrElse(properties.get("spark.executor.cores").orNull)
+ master = Option(master).orElse(properties.get("spark.master")).orNull
+ executorMemory = Option(executorMemory).orElse(properties.get("spark.executor.memory")).orNull
+ executorCores = Option(executorCores).orElse(properties.get("spark.executor.cores")).orNull
totalExecutorCores = Option(totalExecutorCores)
- .getOrElse(properties.get("spark.cores.max").orNull)
- name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
- jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
+ .orElse(properties.get("spark.cores.max"))
+ .orNull
+ name = Option(name).orElse(properties.get("spark.app.name")).orNull
+ jars = Option(jars).orElse(properties.get("spark.jars")).orNull
// This supports env vars in older versions of Spark
- master = Option(master).getOrElse(System.getenv("MASTER"))
- deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE"))
+ master = Option(master).orElse(env.get("MASTER")).orNull
+ deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && primaryResource != null) {
@@ -182,7 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
}
if (master.startsWith("yarn")) {
- val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
+ val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
throw new Exception(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 0c324d8bdf..4cba90e8f2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import java.io.{File, OutputStream, PrintStream}
+import java.io._
import scala.collection.mutable.ArrayBuffer
@@ -26,6 +26,7 @@ import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.util.Utils
import org.scalatest.FunSuite
import org.scalatest.Matchers
+import com.google.common.io.Files
class SparkSubmitSuite extends FunSuite with Matchers {
def beforeAll() {
@@ -306,6 +307,21 @@ class SparkSubmitSuite extends FunSuite with Matchers {
runSparkSubmit(args)
}
+ test("SPARK_CONF_DIR overrides spark-defaults.conf") {
+ forConfDir(Map("spark.executor.memory" -> "2.3g")) { path =>
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val args = Seq(
+ "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local",
+ unusedJar.toString)
+ val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> path))
+ assert(appArgs.propertiesFile != null)
+ assert(appArgs.propertiesFile.startsWith(path))
+ appArgs.executorMemory should be ("2.3g")
+ }
+ }
+
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
@@ -314,6 +330,22 @@ class SparkSubmitSuite extends FunSuite with Matchers {
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
}
+
+ def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
+ val tmpDir = Files.createTempDir()
+
+ val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
+ val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf))
+ for ((key, value) <- defaults) writer.write(s"$key $value\n")
+
+ writer.close()
+
+ try {
+ f(tmpDir.getAbsolutePath)
+ } finally {
+ Utils.deleteRecursively(tmpDir)
+ }
+ }
}
object JarCreationTest {