diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-03-29 14:41:36 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-29 14:41:36 -0700 |
commit | 1617816090e7b20124a512a43860a21232ebf511 (patch) | |
tree | cb6e45d21cb59edd81ab3bc29b9e00ab034bb90d /yarn/common/src/main | |
parent | 3738f24421d6f3bd10e5ef9ebfc10f702a5cb7ac (diff) | |
download | spark-1617816090e7b20124a512a43860a21232ebf511.tar.gz spark-1617816090e7b20124a512a43860a21232ebf511.tar.bz2 spark-1617816090e7b20124a512a43860a21232ebf511.zip |
SPARK-1126. spark-app preliminary
This is a starting version of the spark-app script for running compiled binaries against Spark. It still needs tests and some polish. The only testing I've done so far has been using it to launch jobs in yarn-standalone mode against a pseudo-distributed cluster.
This leaves out the changes required for launching python scripts. I think it might be best to save those for another JIRA/PR (while keeping to the design so that they won't require backwards-incompatible changes).
Author: Sandy Ryza <sandy@cloudera.com>
Closes #86 from sryza/sandy-spark-1126 and squashes the following commits:
d428d85 [Sandy Ryza] Commenting, doc, and import fixes from Patrick's comments
e7315c6 [Sandy Ryza] Fix failing tests
34de899 [Sandy Ryza] Change --more-jars to --jars and fix docs
299ddca [Sandy Ryza] Fix scalastyle
a94c627 [Sandy Ryza] Add newline at end of SparkSubmit
04bc4e2 [Sandy Ryza] SPARK-1126. spark-submit script
Diffstat (limited to 'yarn/common/src/main')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d1f13e3c36..161918859e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -33,11 +33,12 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null - private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) { - Option(System.getenv(optionalParam)) foreach { - optParam => { - arrayBuf += (optionName, optParam) - } + private[spark] def addArg(optionName: String, envVar: String, sysProp: String, + arrayBuf: ArrayBuffer[String]) { + if (System.getProperty(sysProp) != null) { + arrayBuf += (optionName, System.getProperty(sysProp)) + } else if (System.getenv(envVar) != null) { + arrayBuf += (optionName, System.getenv(envVar)) } } @@ -56,22 +57,24 @@ private[spark] class YarnClientSchedulerBackend( "--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher" ) - // process any optional arguments, use the defaults already defined in ClientArguments - // if things aren't specified - Map("SPARK_MASTER_MEMORY" -> "--driver-memory", - "SPARK_DRIVER_MEMORY" -> "--driver-memory", - "SPARK_WORKER_INSTANCES" -> "--num-executors", - "SPARK_WORKER_MEMORY" -> "--executor-memory", - "SPARK_WORKER_CORES" -> "--executor-cores", - "SPARK_EXECUTOR_INSTANCES" -> "--num-executors", - "SPARK_EXECUTOR_MEMORY" -> "--executor-memory", - "SPARK_EXECUTOR_CORES" -> "--executor-cores", - "SPARK_YARN_QUEUE" -> "--queue", - "SPARK_YARN_APP_NAME" -> "--name", - "SPARK_YARN_DIST_FILES" -> "--files", - "SPARK_YARN_DIST_ARCHIVES" -> "--archives") - .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) } - + // process any optional arguments, given either as environment variables + // or system properties. use the defaults already defined in ClientArguments + // if things aren't specified. system properties override environment + // variables. + List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), + ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"), + ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), + ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), + ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), + ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), + ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"), + ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"), + ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives")) + .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } + logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) client = new Client(args, conf) |