diff options
author | WangTaoTheTonic <barneystinson@aliyun.com> | 2015-01-16 09:16:56 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-01-16 09:16:56 -0800 |
commit | 2be82b1e66cd188456bbf1e5abb13af04d1629d5 (patch) | |
tree | ba62a9aeb1084739fd75ffd6d991992b70be3af9 /yarn/src | |
parent | a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4 (diff) | |
download | spark-2be82b1e66cd188456bbf1e5abb13af04d1629d5.tar.gz spark-2be82b1e66cd188456bbf1e5abb13af04d1629d5.tar.bz2 spark-2be82b1e66cd188456bbf1e5abb13af04d1629d5.zip |
[SPARK-1507][YARN]specify # cores for ApplicationMaster
Based on top of changes in https://github.com/apache/spark/pull/3806.
https://issues.apache.org/jira/browse/SPARK-1507
`--driver-cores` and `spark.driver.cores` for all cluster modes and `spark.yarn.am.cores` for yarn client mode.
Author: WangTaoTheTonic <barneystinson@aliyun.com>
Author: WangTao <barneystinson@aliyun.com>
Closes #4018 from WangTaoTheTonic/SPARK-1507 and squashes the following commits:
01419d3 [WangTaoTheTonic] amend the args name
b255795 [WangTaoTheTonic] indet thing
d86557c [WangTaoTheTonic] some comments amend
43c9392 [WangTao] fix compile error
b39a100 [WangTao] specify # cores for ApplicationMaster
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 24 |
2 files changed, 21 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 032106371c..d4eeccf642 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -127,6 +127,7 @@ private[spark] class Client( } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) + capability.setVirtualCores(args.amCores) appContext.setResource(capability) appContext } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 461a9ccd3c..79bead77ba 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") var amMemory: Int = 512 // MB + var amCores: Int = 1 var appName: String = "Spark" var priority = 0 def isClusterMode: Boolean = userClass != null private var driverMemory: Int = 512 // MB + private var driverCores: Int = 1 private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" private val amMemKey = "spark.yarn.am.memory" private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" + private val driverCoresKey = "spark.driver.cores" + private val amCoresKey = "spark.yarn.am.cores" private val isDynamicAllocationEnabled = sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) @@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) "You must specify at least 1 executor!\n" + getUsageMessage()) } if (isClusterMode) { - for (key <- Seq(amMemKey, amMemOverheadKey)) { + for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in cluster mode.") } } amMemory = driverMemory + amCores = driverCores } else { - if (sparkConf.contains(driverMemOverheadKey)) { - println(s"$driverMemOverheadKey is set but does not apply in client mode.") + for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { + if (sparkConf.contains(key)) { + println(s"$key is set but does not apply in client mode.") + } } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) .foreach { mem => amMemory = mem } + sparkConf.getOption(amCoresKey) + .map(_.toInt) + .foreach { cores => amCores = cores } } } @@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) driverMemory = value args = tail + case ("--driver-cores") :: IntParam(value) :: tail => + driverCores = value + args = tail + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") @@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" - message + """ + message + + """ |Usage: org.apache.spark.deploy.yarn.Client [options] |Options: | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster @@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | --num-executors NUM Number of executors to start (Default: 2) | --executor-cores NUM Number of cores for the executors (Default: 1). | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) + | --driver-cores NUM Number of cores used by the driver (Default: 1). | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) | --name NAME The name of your application (Default: Spark) | --queue QUEUE The hadoop queue to use for allocation requests (Default: |