diff options
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 24 |
2 files changed, 21 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 032106371c..d4eeccf642 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -127,6 +127,7 @@ private[spark] class Client( } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) + capability.setVirtualCores(args.amCores) appContext.setResource(capability) appContext } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 461a9ccd3c..79bead77ba 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") var amMemory: Int = 512 // MB + var amCores: Int = 1 var appName: String = "Spark" var priority = 0 def isClusterMode: Boolean = userClass != null private var driverMemory: Int = 512 // MB + private var driverCores: Int = 1 private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" private val amMemKey = "spark.yarn.am.memory" private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" + private val driverCoresKey = "spark.driver.cores" + private val amCoresKey = "spark.yarn.am.cores" private val isDynamicAllocationEnabled = sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) @@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) "You must specify at least 1 executor!\n" + getUsageMessage()) } if (isClusterMode) { - for (key <- Seq(amMemKey, amMemOverheadKey)) { + for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in cluster mode.") } } amMemory = driverMemory + amCores = driverCores } else { - if (sparkConf.contains(driverMemOverheadKey)) { - println(s"$driverMemOverheadKey is set but does not apply in client mode.") + for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { + if (sparkConf.contains(key)) { + println(s"$key is set but does not apply in client mode.") + } } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) .foreach { mem => amMemory = mem } + sparkConf.getOption(amCoresKey) + .map(_.toInt) + .foreach { cores => amCores = cores } } } @@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) driverMemory = value args = tail + case ("--driver-cores") :: IntParam(value) :: tail => + driverCores = value + args = tail + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") @@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" - message + """ + message + + """ |Usage: org.apache.spark.deploy.yarn.Client [options] |Options: | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster @@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | --num-executors NUM Number of executors to start (Default: 2) | --executor-cores NUM Number of cores for the executors (Default: 1). | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) + | --driver-cores NUM Number of cores used by the driver (Default: 1). | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) | --name NAME The name of your application (Default: Spark) | --queue QUEUE The hadoop queue to use for allocation requests (Default: |