aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-10-30 15:44:29 -0700
committerAndrew Or <andrew@databricks.com>2014-10-30 15:44:29 -0700
commit2f54543815c0905dc958d444ad638c23a29507c6 (patch)
tree0705f9ed66800094a269533680dd114944355540
parentd3450578357d6f7598243ee2ab11c338085ad9c1 (diff)
downloadspark-2f54543815c0905dc958d444ad638c23a29507c6.tar.gz
spark-2f54543815c0905dc958d444ad638c23a29507c6.tar.bz2
spark-2f54543815c0905dc958d444ad638c23a29507c6.zip
[SPARK-3661] Respect spark.*.memory in cluster mode
This also includes minor re-organization of the code. Tested locally in both client and deploy modes. Author: Andrew Or <andrew@databricks.com> Author: Andrew Or <andrewor14@gmail.com> Closes #2697 from andrewor14/memory-cluster-mode and squashes the following commits: 01d78bc [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode ccd468b [Andrew Or] Add some comments per Patrick c956577 [Andrew Or] Tweak wording 2b4afa0 [Andrew Or] Unused import 47a5a88 [Andrew Or] Correct Spark properties precedence order bf64717 [Andrew Or] Merge branch 'master' of github.com:apache/spark into memory-cluster-mode dd452d0 [Andrew Or] Respect spark.*.memory in cluster mode
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala74
2 files changed, 45 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0379adeb07..b43e68e40f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -274,17 +274,11 @@ object SparkSubmit {
}
}
- // Properties given with --conf are superceded by other options, but take precedence over
- // properties in the defaults file.
+ // Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
}
- // Read from default spark properties, if any
- for ((k, v) <- args.defaultSparkProperties) {
- sysProps.getOrElseUpdate(k, v)
- }
-
// Resolve paths in certain spark properties
val pathConfigs = Seq(
"spark.jars",
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 72a452e0ae..f0e9ee67f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy
import java.util.jar.JarFile
-import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.util.Utils
@@ -72,39 +71,54 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
defaultProperties
}
- // Respect SPARK_*_MEMORY for cluster mode
- driverMemory = sys.env.get("SPARK_DRIVER_MEMORY").orNull
- executorMemory = sys.env.get("SPARK_EXECUTOR_MEMORY").orNull
-
+ // Set parameters from command line arguments
parseOpts(args.toList)
- mergeSparkProperties()
+ // Populate `sparkProperties` map from properties file
+ mergeDefaultSparkProperties()
+ // Use `sparkProperties` map along with env vars to fill in any missing parameters
+ loadEnvironmentArguments()
+
checkRequiredArguments()
/**
- * Fill in any undefined values based on the default properties file or options passed in through
- * the '--conf' flag.
+ * Merge values from the default properties file with those specified through --conf.
+ * When this is called, `sparkProperties` is already filled with configs from the latter.
*/
- private def mergeSparkProperties(): Unit = {
+ private def mergeDefaultSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
+ // Honor --conf before the defaults file
+ defaultSparkProperties.foreach { case (k, v) =>
+ if (!sparkProperties.contains(k)) {
+ sparkProperties(k) = v
+ }
+ }
+ }
- val properties = HashMap[String, String]()
- properties.putAll(defaultSparkProperties)
- properties.putAll(sparkProperties)
-
- // Use properties file as fallback for values which have a direct analog to
- // arguments in this script.
- master = Option(master).orElse(properties.get("spark.master")).orNull
- executorMemory = Option(executorMemory).orElse(properties.get("spark.executor.memory")).orNull
- executorCores = Option(executorCores).orElse(properties.get("spark.executor.cores")).orNull
+ /**
+ * Load arguments from environment variables, Spark properties etc.
+ */
+ private def loadEnvironmentArguments(): Unit = {
+ master = Option(master)
+ .orElse(sparkProperties.get("spark.master"))
+ .orElse(env.get("MASTER"))
+ .orNull
+ driverMemory = Option(driverMemory)
+ .orElse(sparkProperties.get("spark.driver.memory"))
+ .orElse(env.get("SPARK_DRIVER_MEMORY"))
+ .orNull
+ executorMemory = Option(executorMemory)
+ .orElse(sparkProperties.get("spark.executor.memory"))
+ .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
+ .orNull
+ executorCores = Option(executorCores)
+ .orElse(sparkProperties.get("spark.executor.cores"))
+ .orNull
totalExecutorCores = Option(totalExecutorCores)
- .orElse(properties.get("spark.cores.max"))
+ .orElse(sparkProperties.get("spark.cores.max"))
.orNull
- name = Option(name).orElse(properties.get("spark.app.name")).orNull
- jars = Option(jars).orElse(properties.get("spark.jars")).orNull
-
- // This supports env vars in older versions of Spark
- master = Option(master).orElse(env.get("MASTER")).orNull
+ name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
+ jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
// Try to set main class from JAR if no --class argument is given
@@ -131,7 +145,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
/** Ensure that required fields exists. Call this only once all defaults are loaded. */
- private def checkRequiredArguments() = {
+ private def checkRequiredArguments(): Unit = {
if (args.length == 0) {
printUsageAndExit(-1)
}
@@ -166,7 +180,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
}
- override def toString = {
+ override def toString = {
s"""Parsed arguments:
| master $master
| deployMode $deployMode
@@ -174,7 +188,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| executorCores $executorCores
| totalExecutorCores $totalExecutorCores
| propertiesFile $propertiesFile
- | extraSparkProperties $sparkProperties
| driverMemory $driverMemory
| driverCores $driverCores
| driverExtraClassPath $driverExtraClassPath
@@ -193,8 +206,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| jars $jars
| verbose $verbose
|
- |Default properties from $propertiesFile:
- |${defaultSparkProperties.mkString(" ", "\n ", "\n")}
+ |Spark properties used, including those specified through
+ | --conf and those from the properties file $propertiesFile:
+ |${sparkProperties.mkString(" ", "\n ", "\n")}
""".stripMargin
}
@@ -327,7 +341,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
}
- private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
+ private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = {
val outStream = SparkSubmit.printStream
if (unknownParam != null) {
outStream.println("Unknown/unsupported param " + unknownParam)