aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-03-29 14:41:36 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-29 14:41:36 -0700
commit1617816090e7b20124a512a43860a21232ebf511 (patch)
treecb6e45d21cb59edd81ab3bc29b9e00ab034bb90d /yarn
parent3738f24421d6f3bd10e5ef9ebfc10f702a5cb7ac (diff)
downloadspark-1617816090e7b20124a512a43860a21232ebf511.tar.gz
spark-1617816090e7b20124a512a43860a21232ebf511.tar.bz2
spark-1617816090e7b20124a512a43860a21232ebf511.zip
SPARK-1126. spark-app preliminary
This is a starting version of the spark-app script for running compiled binaries against Spark. It still needs tests and some polish. The only testing I've done so far has been using it to launch jobs in yarn-standalone mode against a pseudo-distributed cluster. This leaves out the changes required for launching python scripts. I think it might be best to save those for another JIRA/PR (while keeping to the design so that they won't require backwards-incompatible changes). Author: Sandy Ryza <sandy@cloudera.com> Closes #86 from sryza/sandy-spark-1126 and squashes the following commits: d428d85 [Sandy Ryza] Commenting, doc, and import fixes from Patrick's comments e7315c6 [Sandy Ryza] Fix failing tests 34de899 [Sandy Ryza] Change --more-jars to --jars and fix docs 299ddca [Sandy Ryza] Fix scalastyle a94c627 [Sandy Ryza] Add newline at end of SparkSubmit 04bc4e2 [Sandy Ryza] SPARK-1126. spark-submit script
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala45
1 files changed, 24 insertions, 21 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d1f13e3c36..161918859e 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -33,11 +33,12 @@ private[spark] class YarnClientSchedulerBackend(
var client: Client = null
var appId: ApplicationId = null
- private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) {
- Option(System.getenv(optionalParam)) foreach {
- optParam => {
- arrayBuf += (optionName, optParam)
- }
+ private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
+ arrayBuf: ArrayBuffer[String]) {
+ if (System.getProperty(sysProp) != null) {
+ arrayBuf += (optionName, System.getProperty(sysProp))
+ } else if (System.getenv(envVar) != null) {
+ arrayBuf += (optionName, System.getenv(envVar))
}
}
@@ -56,22 +57,24 @@ private[spark] class YarnClientSchedulerBackend(
"--am-class", "org.apache.spark.deploy.yarn.ExecutorLauncher"
)
- // process any optional arguments, use the defaults already defined in ClientArguments
- // if things aren't specified
- Map("SPARK_MASTER_MEMORY" -> "--driver-memory",
- "SPARK_DRIVER_MEMORY" -> "--driver-memory",
- "SPARK_WORKER_INSTANCES" -> "--num-executors",
- "SPARK_WORKER_MEMORY" -> "--executor-memory",
- "SPARK_WORKER_CORES" -> "--executor-cores",
- "SPARK_EXECUTOR_INSTANCES" -> "--num-executors",
- "SPARK_EXECUTOR_MEMORY" -> "--executor-memory",
- "SPARK_EXECUTOR_CORES" -> "--executor-cores",
- "SPARK_YARN_QUEUE" -> "--queue",
- "SPARK_YARN_APP_NAME" -> "--name",
- "SPARK_YARN_DIST_FILES" -> "--files",
- "SPARK_YARN_DIST_ARCHIVES" -> "--archives")
- .foreach { case (optParam, optName) => addArg(optName, optParam, argsArrayBuf) }
-
+ // process any optional arguments, given either as environment variables
+ // or system properties. use the defaults already defined in ClientArguments
+ // if things aren't specified. system properties override environment
+ // variables.
+ List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+ ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+ ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.worker.instances"),
+ ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
+ ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+ ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
+ ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+ ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+ ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+ ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"),
+ ("--files", "SPARK_YARN_DIST_FILES", "spark.yarn.dist.files"),
+ ("--archives", "SPARK_YARN_DIST_ARCHIVES", "spark.yarn.dist.archives"))
+ .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
+
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
client = new Client(args, conf)