aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorWangTaoTheTonic <barneystinson@aliyun.com>2015-01-16 09:16:56 -0800
committerAndrew Or <andrew@databricks.com>2015-01-16 09:16:56 -0800
commit2be82b1e66cd188456bbf1e5abb13af04d1629d5 (patch)
treeba62a9aeb1084739fd75ffd6d991992b70be3af9 /yarn
parenta79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4 (diff)
downloadspark-2be82b1e66cd188456bbf1e5abb13af04d1629d5.tar.gz
spark-2be82b1e66cd188456bbf1e5abb13af04d1629d5.tar.bz2
spark-2be82b1e66cd188456bbf1e5abb13af04d1629d5.zip
[SPARK-1507][YARN]specify # cores for ApplicationMaster
Based on top of changes in https://github.com/apache/spark/pull/3806. https://issues.apache.org/jira/browse/SPARK-1507 `--driver-cores` and `spark.driver.cores` for all cluster modes and `spark.yarn.am.cores` for yarn client mode. Author: WangTaoTheTonic <barneystinson@aliyun.com> Author: WangTao <barneystinson@aliyun.com> Closes #4018 from WangTaoTheTonic/SPARK-1507 and squashes the following commits: 01419d3 [WangTaoTheTonic] amend the args name b255795 [WangTaoTheTonic] indet thing d86557c [WangTaoTheTonic] some comments amend 43c9392 [WangTao] fix compile error b39a100 [WangTao] specify # cores for ApplicationMaster
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala1
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala24
2 files changed, 21 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 032106371c..d4eeccf642 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -127,6 +127,7 @@ private[spark] class Client(
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
+ capability.setVirtualCores(args.amCores)
appContext.setResource(capability)
appContext
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 461a9ccd3c..79bead77ba 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -36,14 +36,18 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var numExecutors = DEFAULT_NUMBER_EXECUTORS
var amQueue = sparkConf.get("spark.yarn.queue", "default")
var amMemory: Int = 512 // MB
+ var amCores: Int = 1
var appName: String = "Spark"
var priority = 0
def isClusterMode: Boolean = userClass != null
private var driverMemory: Int = 512 // MB
+ private var driverCores: Int = 1
private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
private val amMemKey = "spark.yarn.am.memory"
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
+ private val driverCoresKey = "spark.driver.cores"
+ private val amCoresKey = "spark.yarn.am.cores"
private val isDynamicAllocationEnabled =
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
@@ -92,19 +96,25 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
"You must specify at least 1 executor!\n" + getUsageMessage())
}
if (isClusterMode) {
- for (key <- Seq(amMemKey, amMemOverheadKey)) {
+ for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
if (sparkConf.contains(key)) {
println(s"$key is set but does not apply in cluster mode.")
}
}
amMemory = driverMemory
+ amCores = driverCores
} else {
- if (sparkConf.contains(driverMemOverheadKey)) {
- println(s"$driverMemOverheadKey is set but does not apply in client mode.")
+ for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
+ if (sparkConf.contains(key)) {
+ println(s"$key is set but does not apply in client mode.")
+ }
}
sparkConf.getOption(amMemKey)
.map(Utils.memoryStringToMb)
.foreach { mem => amMemory = mem }
+ sparkConf.getOption(amCoresKey)
+ .map(_.toInt)
+ .foreach { cores => amCores = cores }
}
}
@@ -140,6 +150,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
driverMemory = value
args = tail
+ case ("--driver-cores") :: IntParam(value) :: tail =>
+ driverCores = value
+ args = tail
+
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
@@ -198,7 +212,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
- message + """
+ message +
+ """
|Usage: org.apache.spark.deploy.yarn.Client [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster
@@ -209,6 +224,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| --num-executors NUM Number of executors to start (Default: 2)
| --executor-cores NUM Number of cores for the executors (Default: 1).
| --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512 Mb)
+ | --driver-cores NUM Number of cores used by the driver (Default: 1).
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G)
| --name NAME The name of your application (Default: Spark)
| --queue QUEUE The hadoop queue to use for allocation requests (Default: