aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala45
1 files changed, 24 insertions, 21 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d1f13e3c36..161918859e 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -33,11 +33,12 @@ private[spark] class YarnClientSchedulerBackend(
var client: Client = null
var appId: ApplicationId = null
- private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) {
- Option(System.getenv(optionalParam)) foreach {
- optParam => {
- arrayBuf += (optionName, optParam)
- }
+ private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
+ arrayBuf: ArrayBuffer[String]) {
+ if (System.getProperty(sysProp) != null) {
+ arrayBuf += (optionName, System.getProperty(sysProp))
+ } else if (System.getenv(envVar) != null) {
+ arrayBuf += (optionName, System.getenv(envVar))
}
}
@@ -56,22 +57,24 @@ private[spark] class YarnClientSchedulerBackend(
"--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
)
- // process any optional arguments, use the defaults already defined in ClientArguments
- // if things aren't specified
- Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
- "SPARK_DRIVER_MEMORY" -> "--driver-memory",
- "SPARK_WORKER_INSTANCES" -> "--num-executors",
- "SPARK_WORKER_MEMORY" -> "--executor-memory",
- "SPARK_WORKER_CORES" -> "--executor-cores",
- "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
- "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
- "SPARK_EXECUTOR_CORES" -> "--executor-cores",
- "SPARK_YARN_QUEUE" -> "--queue",
- "SPARK_YARN_APP_NAME" -> "--name",
- "SPARK_YARN_DIST_FILES" -> "--files",
- "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
- .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
-
+ // process any optional arguments, given either as environment variables
+ // or system properties. use the defaults already defined in ClientArguments
+ // if things aren't specified. system properties override environment
+ // variables.
+ List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+ ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+ ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"),
+ ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
+ ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+ ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
+ ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+ ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+ ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+ ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"),
+ ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
+ ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"))
+ .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
+
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
client = new Client(args, conf)