diff options
7 files changed, 58 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 2e1e52906c..e5873ce724 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import org.apache.log4j.Level -import org.apache.spark.util.MemoryParam +import org.apache.spark.util.{IntParam, MemoryParam} /** * Command-line parser for the driver client. @@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) { parse(args.toList) def parse(args: List[String]): Unit = args match { - case ("--cores" | "-c") :: value :: tail => - cores = value.toInt + case ("--cores" | "-c") :: IntParam(value) :: tail => + cores = value parse(tail) case ("--memory" | "-m") :: MemoryParam(value) :: tail => diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 955cbd6dab..050ba91eb2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -200,6 +200,7 @@ object SparkSubmit { // Yarn cluster only OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"), OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"), + OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"), OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"), OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"), OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 47059b08a3..81ec08cb6d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -108,6 +108,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St .orElse(sparkProperties.get("spark.driver.memory")) .orElse(env.get("SPARK_DRIVER_MEMORY")) .orNull + driverCores = Option(driverCores) + .orElse(sparkProperties.get("spark.driver.cores")) + .orNull executorMemory = Option(executorMemory) .orElse(sparkProperties.get("spark.executor.memory")) .orElse(env.get("SPARK_EXECUTOR_MEMORY")) @@ -406,6 +409,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St | --total-executor-cores NUM Total cores for all executors. | | YARN-only: + | --driver-cores NUM Number of cores used by the driver, only in cluster mode + | (Default: 1). | --executor-cores NUM Number of cores per executor (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). diff --git a/docs/configuration.md b/docs/configuration.md index 673cdb371a..efbab40853 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -102,11 +102,10 @@ of the most common options to set are: </td> </tr> <tr> - <td><code>spark.executor.memory</code></td> - <td>512m</td> + <td><code>spark.driver.cores</code></td> + <td>1</td> <td> - Amount of memory to use per executor process, in the same format as JVM memory strings - (e.g. <code>512m</code>, <code>2g</code>). + Number of cores to use for the driver process, only in cluster mode. </td> </tr> <tr> @@ -118,6 +117,14 @@ of the most common options to set are: </td> </tr> <tr> + <td><code>spark.executor.memory</code></td> + <td>512m</td> + <td> + Amount of memory to use per executor process, in the same format as JVM memory strings + (e.g. <code>512m</code>, <code>2g</code>). + </td> +</tr> +<tr> <td><code>spark.driver.maxResultSize</code></td> <td>1g</td> <td> diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 4f273098c5..68ab127bcf 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -30,6 +30,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes </td> </tr> <tr> + <td><code>spark.driver.cores</code></td> + <td>1</td> + <td> + Number of cores used by the driver in YARN cluster mode. + Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN AM. + In client mode, use <code>spark.yarn.am.cores</code> to control the number of cores used by the YARN AM instead. + </td> +</tr> +<tr> + <td><code>spark.yarn.am.cores</code></td> + <td>1</td> + <td> + Number of cores to use for the YARN Application Master in client mode. + In cluster mode, use <code>spark.driver.cores</code> instead. + </td> +</tr> +<tr> <td><code>spark.yarn.am.waitTime</code></td> <td>100000</td> <td> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 032106371c..d4eeccf642 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -127,6 +127,7 @@ private[spark] class Client( } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) + capability.setVirtualCores(args.amCores) appContext.setResource(capability) appContext } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 461a9ccd3c..79bead77ba 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") var amMemory: Int = 512 // MB + var amCores: Int = 1 var appName: String = "Spark" var priority = 0 def isClusterMode: Boolean = userClass != null private var driverMemory: Int = 512 // MB + private var driverCores: Int = 1 private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" private val amMemKey = "spark.yarn.am.memory" private val amMemOverheadKey = "spark.yarn.am.memoryOverhead" + private val driverCoresKey = "spark.driver.cores" + private val amCoresKey = "spark.yarn.am.cores" private val isDynamicAllocationEnabled = sparkConf.getBoolean("spark.dynamicAllocation.enabled", false) @@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) "You must specify at least 1 executor!\n" + getUsageMessage()) } if (isClusterMode) { - for (key <- Seq(amMemKey, amMemOverheadKey)) { + for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) { if (sparkConf.contains(key)) { println(s"$key is set but does not apply in cluster mode.") } } amMemory = driverMemory + amCores = driverCores } else { - if (sparkConf.contains(driverMemOverheadKey)) { - println(s"$driverMemOverheadKey is set but does not apply in client mode.") + for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { + if (sparkConf.contains(key)) { + println(s"$key is set but does not apply in client mode.") + } } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) .foreach { mem => amMemory = mem } + sparkConf.getOption(amCoresKey) + .map(_.toInt) + .foreach { cores => amCores = cores } } } @@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) driverMemory = value args = tail + case ("--driver-cores") :: IntParam(value) :: tail => + driverCores = value + args = tail + case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") @@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" - message + """ + message + + """ |Usage: org.apache.spark.deploy.yarn.Client [options] |Options: | --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster @@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | --num-executors NUM Number of executors to start (Default: 2) | --executor-cores NUM Number of cores for the executors (Default: 1). | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb) + | --driver-cores NUM Number of cores used by the driver (Default: 1). | --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G) | --name NAME The name of your application (Default: Spark) | --queue QUEUE The hadoop queue to use for allocation requests (Default: |