diff options
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d1f13e3c36..161918859e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -33,11 +33,12 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null - private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) { - Option(System.getenv(optionalParam)) foreach { - optParam => { - arrayBuf += (optionName, optParam) - } + private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + arrayBuf: ArrayBuffer[String]) { + if (System.getProperty(sysProp) != null) { + arrayBuf += (optionName, System.getProperty(sysProp)) + } else if (System.getenv(envVar) != null) { + arrayBuf += (optionName, System.getenv(envVar)) } } @@ -56,22 +57,24 @@ private[spark] class YarnClientSchedulerBackend( "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher" ) - // process any optional arguments, use the defaults already defined in ClientArguments - // if things aren't specified - Map("SPARK_MASTER_MEMORY" -> "--driver-memory", - "SPARK_DRIVER_MEMORY" -> "--driver-memory", - "SPARK_WORKER_INSTANCES" -> "--num-executors", - "SPARK_WORKER_MEMORY" -> "--executor-memory", - "SPARK_WORKER_CORES" -> "--executor-cores", - "SPARK_EXECUTOR_INSTANCES" -> "--num-executors", - "SPARK_EXECUTOR_MEMORY" -> "--executor-memory", - "SPARK_EXECUTOR_CORES" -> "--executor-cores", - "SPARK_YARN_QUEUE" -> "--queue", - "SPARK_YARN_APP_NAME" -> "--name", - "SPARK_YARN_DIST_FILES" -> "--files", - "SPARK_YARN_DIST_ARCHIVES" -> "--archives") - .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) } - + // process any optional arguments, given either as environment variables + // or system properties. use the defaults already defined in ClientArguments + // if things aren't specified. system properties override environment + // variables. + List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), + ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"), + ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), + ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), + ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), + ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), + ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"), + ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"), + ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives")) + .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } + logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) client = new Client(args, conf) |