diff options
author | Henry Saputra <hsaputra@apache.org> | 2014-01-12 10:34:13 -0800 |
---|---|---|
committer | Henry Saputra <hsaputra@apache.org> | 2014-01-12 10:34:13 -0800 |
commit | 91a563608e301bb243fca3765d569bde65ad747c (patch) | |
tree | 1dc3da48852d2677846d415b96b4ee0558a54cb5 /yarn/common/src/main | |
parent | 93a65e5fde64ffed3dbd2a050c1007e077ecd004 (diff) | |
parent | 288a878999848adb130041d1e40c14bfc879cec6 (diff) | |
download | spark-91a563608e301bb243fca3765d569bde65ad747c.tar.gz spark-91a563608e301bb243fca3765d569bde65ad747c.tar.bz2 spark-91a563608e301bb243fca3765d569bde65ad747c.zip |
Merge branch 'master' into remove_simpleredundantreturn_scala
Diffstat (limited to 'yarn/common/src/main')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 50 |
1 files changed, 27 insertions, 23 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 4b1b5da048..22e55e0c60 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -22,6 +22,8 @@ import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} import org.apache.spark.scheduler.TaskSchedulerImpl +import scala.collection.mutable.ArrayBuffer + private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) @@ -31,45 +33,47 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null + private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) { + Option(System.getenv(optionalParam)) foreach { + optParam => { + arrayBuf += (optionName, optParam) + } + } + } + override def start() { super.start() - val defalutWorkerCores = "2" - val defalutWorkerMemory = "512m" - val defaultWorkerNumber = "1" - val userJar = System.getenv("SPARK_YARN_APP_JAR") - val distFiles = System.getenv("SPARK_YARN_DIST_FILES") - var workerCores = System.getenv("SPARK_WORKER_CORES") - var workerMemory = System.getenv("SPARK_WORKER_MEMORY") - var workerNumber = System.getenv("SPARK_WORKER_INSTANCES") - if (userJar == null) throw new SparkException("env SPARK_YARN_APP_JAR is not set") - if (workerCores == null) - workerCores = defalutWorkerCores - if (workerMemory == null) - workerMemory = defalutWorkerMemory - if (workerNumber == null) - workerNumber = defaultWorkerNumber - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort - val argsArray = Array[String]( + val argsArrayBuf = new ArrayBuffer[String]() + argsArrayBuf += ( "--class", "notused", "--jar", userJar, "--args", hostport, - "--worker-memory", workerMemory, - "--worker-cores", workerCores, - "--num-workers", workerNumber, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher", - "--files", distFiles + "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" ) - val args = new ClientArguments(argsArray, conf) + // process any optional arguments, use the defaults already defined in ClientArguments + // if things aren't specified + Map("--master-memory" -> "SPARK_MASTER_MEMORY", + "--num-workers" -> "SPARK_WORKER_INSTANCES", + "--worker-memory" -> "SPARK_WORKER_MEMORY", + "--worker-cores" -> "SPARK_WORKER_CORES", + "--queue" -> "SPARK_YARN_QUEUE", + "--name" -> "SPARK_YARN_APP_NAME", + "--files" -> "SPARK_YARN_DIST_FILES", + "--archives" -> "SPARK_YARN_DIST_ARCHIVES") + .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + + logDebug("ClientArguments called with: " + argsArrayBuf) + val args = new ClientArguments(argsArrayBuf.toArray, conf) client = new Client(args, conf) appId = client.runApp() waitForApp() |