aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala5
-rw-r--r--docs/configuration.md15
-rw-r--r--docs/running-on-yarn.md17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala1
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala24
7 files changed, 58 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 2e1e52906c..e5873ce724 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.log4j.Level
-import org.apache.spark.util.MemoryParam
+import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
@@ -51,8 +51,8 @@ private[spark] class ClientArguments(args: Array[String]) {
parse(args.toList)
def parse(args: List[String]): Unit = args match {
- case ("--cores" | "-c") :: value :: tail =>
- cores = value.toInt
+ case ("--cores" | "-c") :: IntParam(value) :: tail =>
+ cores = value
parse(tail)
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 955cbd6dab..050ba91eb2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -200,6 +200,7 @@ object SparkSubmit {
// Yarn cluster only
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
+ OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 47059b08a3..81ec08cb6d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -108,6 +108,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
+ driverCores = Option(driverCores)
+ .orElse(sparkProperties.get("spark.driver.cores"))
+ .orNull
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
@@ -406,6 +409,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --total-executor-cores NUM Total cores for all executors.
|
| YARN-only:
+ | --driver-cores NUM Number of cores used by the driver, only in cluster mode
+ | (Default: 1).
| --executor-cores NUM Number of cores per executor (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
diff --git a/docs/configuration.md b/docs/configuration.md
index 673cdb371a..efbab40853 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -102,11 +102,10 @@ of the most common options to set are:
</td>
</tr>
<tr>
- <td><code>spark.executor.memory</code></td>
- <td>512m</td>
+ <td><code>spark.driver.cores</code></td>
+ <td>1</td>
<td>
- Amount of memory to use per executor process, in the same format as JVM memory strings
- (e.g. <code>512m</code>, <code>2g</code>).
+ Number of cores to use for the driver process, only in cluster mode.
</td>
</tr>
<tr>
@@ -118,6 +117,14 @@ of the most common options to set are:
</td>
</tr>
<tr>
+ <td><code>spark.executor.memory</code></td>
+ <td>512m</td>
+ <td>
+ Amount of memory to use per executor process, in the same format as JVM memory strings
+ (e.g. <code>512m</code>, <code>2g</code>).
+ </td>
+</tr>
+<tr>
<td><code>spark.driver.maxResultSize</code></td>
<td>1g</td>
<td>
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4f273098c5..68ab127bcf 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -30,6 +30,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</td>
</tr>
<tr>
+ <td><code>spark.driver.cores</code></td>
+ <td>1</td>
+ <td>
+ Number of cores used by the driver in YARN cluster mode.
+ Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN AM.
+ In client mode, use <code>spark.yarn.am.cores</code> to control the number of cores used by the YARN AM instead.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.yarn.am.cores</code></td>
+ <td>1</td>
+ <td>
+ Number of cores to use for the YARN Application Master in client mode.
+ In cluster mode, use <code>spark.driver.cores</code> instead.
+ </td>
+</tr>
+<tr>
<td><code>spark.yarn.am.waitTime</code></td>
<td>100000</td>
<td>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 032106371c..d4eeccf642 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -127,6 +127,7 @@ private[spark] class Client(
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
+ capability.setVirtualCores(args.amCores)
appContext.setResource(capability)
appContext
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 461a9ccd3c..79bead77ba 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var numExecutors = DEFAULT_NUMBER_EXECUTORS
var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
+ var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
+ private var driverCores: Int = 1
private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
private val amMemKey = "spark.yarn.am.memory"
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
+ private val driverCoresKey = "spark.driver.cores"
+ private val amCoresKey = "spark.yarn.am.cores"
private val isDynamicAllocationEnabled =
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
@@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
"You must specify at least 1 executor!\n" + getUsageMessage())
}
if (isClusterMode) {
- for (key <- Seq(amMemKey, amMemOverheadKey)) {
+ for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in cluster mode.")
}
}
amMemory = driverMemory
+ amCores = driverCores
} else {
- if (sparkConf.contains(driverMemOverheadKey)) {
- println(s"$driverMemOverheadKey is set but does not apply in client mode.")
+ for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
+ if (sparkConf.contains(key)) {
+ println(s"$key is set but does not apply in client mode.")
+ }
}
sparkConf.getOption(amMemKey)
.map(Utils.memoryStringToMb)
.foreach { mem => amMemory = mem }
+ sparkConf.getOption(amCoresKey)
+ .map(_.toInt)
+ .foreach { cores => amCores = cores }
}
}
@@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
driverMemory = value
args = tail
+ case ("--driver-cores") :: IntParam(value) :: tail =>
+ driverCores = value
+ args = tail
+
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
@@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
- message + """
+ message +
+ """
|Usage: org.apache.spark.deploy.yarn.Client [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster
@@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| --num-executors NUM Number of executors to start (Default: 2)
| --executor-cores NUM Number of cores for the executors (Default: 1).
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
+ | --driver-cores NUM Number of cores used by the driver (Default: 1).
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
| --name NAME The name of your application (Default: Spark)
| --queue QUEUE The hadoop queue to use for allocation requests (Default: