diff options
author | WangTaoTheTonic <barneystinson@aliyun.com> | 2015-01-09 13:20:32 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-01-09 13:23:13 -0800 |
commit | e96645206006a009e5c1a23bbd177dcaf3ef9b83 (patch) | |
tree | 929115f44914d13451878767d2f3896aaa0a8912 /yarn/src | |
parent | 7e8e62aec11c43c983055adc475b96006412199a (diff) | |
download | spark-e96645206006a009e5c1a23bbd177dcaf3ef9b83.tar.gz spark-e96645206006a009e5c1a23bbd177dcaf3ef9b83.tar.bz2 spark-e96645206006a009e5c1a23bbd177dcaf3ef9b83.zip |
[SPARK-1953][YARN]yarn client mode Application Master memory size is same as driver memory...
... size
Ways to set Application Master's memory on yarn-client mode:
1. `spark.yarn.am.memory` in SparkConf or System Properties
2. default value 512m
Note: this arguments is only available in yarn-client mode.
Author: WangTaoTheTonic <barneystinson@aliyun.com>
Closes #3607 from WangTaoTheTonic/SPARK4181 and squashes the following commits:
d5ceb1b [WangTaoTheTonic] spark.driver.memeory is used in both modes
6c1b264 [WangTaoTheTonic] rebase
b8410c0 [WangTaoTheTonic] minor optiminzation
ddcd592 [WangTaoTheTonic] fix the bug produced in rebase and some improvements
3bf70cc [WangTaoTheTonic] rebase and give proper hint
987b99d [WangTaoTheTonic] disable --driver-memory in client mode
2b27928 [WangTaoTheTonic] inaccurate description
b7acbb2 [WangTaoTheTonic] incorrect method invoked
2557c5e [WangTaoTheTonic] missing a single blank
42075b0 [WangTaoTheTonic] arrange the args and warn logging
69c7dba [WangTaoTheTonic] rebase
1960d16 [WangTaoTheTonic] fix wrong comment
7fa9e2e [WangTaoTheTonic] log a warning
f6bee0e [WangTaoTheTonic] docs issue
d619996 [WangTaoTheTonic] Merge branch 'master' into SPARK4181
b09c309 [WangTaoTheTonic] use code format
ab16bb5 [WangTaoTheTonic] fix bug and add comments
44e48c2 [WangTaoTheTonic] minor fix
6fd13e1 [WangTaoTheTonic] add overhead mem and remove some configs
0566bb8 [WangTaoTheTonic] yarn client mode Application Master memory size is same as driver memory size
Diffstat (limited to 'yarn/src')
3 files changed, 29 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c363d755c1..032106371c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -65,7 +65,7 @@ private[spark] class Client( private val amMemoryOverhead = args.amMemoryOverhead // MB private val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() - private val isClusterMode = args.userClass != null + private val isClusterMode = args.isClusterMode def stop(): Unit = yarnClient.stop() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 39f1021c9d..fdbf9f8eed 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -38,23 +38,27 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var amMemory: Int = 512 // MB var appName: String = "Spark" var priority = 0 + def isClusterMode: Boolean = userClass != null + + private var driverMemory: Int = 512 // MB + private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" + private val amMemKey = "spark.yarn.am.memory" + private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" + private val isDynamicAllocationEnabled = + sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) parseArgs(args.toList) + loadEnvironmentArgs() + validateArgs() // Additional memory to allocate to containers - // For now, use driver's memory overhead as our AM container's memory overhead - val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey + val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) - private val isDynamicAllocationEnabled = - sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) - - loadEnvironmentArgs() - validateArgs() - /** Load any default arguments provided through environment variables and Spark properties. */ private def loadEnvironmentArgs(): Unit = { // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, @@ -87,6 +91,21 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) throw new IllegalArgumentException( "You must specify at least 1 executor!\n" + getUsageMessage()) } + if (isClusterMode) { + for (key <- Seq(amMemKey, amMemOverheadKey)) { + if (sparkConf.contains(key)) { + println(s"$key is set but does not apply in cluster mode.") + } + } + amMemory = driverMemory + } else { + if (sparkConf.contains(driverMemOverheadKey)) { + println(s"$driverMemOverheadKey is set but does not apply in client mode.") + } + sparkConf.getOption(amMemKey) + .map(Utils.memoryStringToMb) + .foreach { mem => amMemory = mem } + } } private def parseArgs(inputArgs: List[String]): Unit = { @@ -118,7 +137,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (args(0) == "--master-memory") { println("--master-memory is deprecated. Use --driver-memory instead.") } - amMemory = value + driverMemory = value args = tail case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 09597bd0e6..f99291553b 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -68,8 +68,6 @@ private[spark] class YarnClientSchedulerBackend( // List of (target Client argument, environment variable, Spark property) val optionTuples = List( - ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), - ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), |