diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-24 23:20:34 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-25 01:19:01 -0800 |
commit | 760823d3937822ea4a6d6f476815442711c605fa (patch) | |
tree | 60267167282344a82717188f1c9bd1209434e791 | |
parent | 6a4acc4c2d5c510cc76049dd8727cec76a2173e8 (diff) | |
download | spark-760823d3937822ea4a6d6f476815442711c605fa.tar.gz spark-760823d3937822ea4a6d6f476815442711c605fa.tar.bz2 spark-760823d3937822ea4a6d6f476815442711c605fa.zip |
Adding better option parsing
10 files changed, 187 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 67435261e4..332c7e8bbf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -84,7 +84,7 @@ private[deploy] object DeployMessages { sparkHome: String) extends DeployMessage - case class LaunchDriver(driverId: String, jarUrl: String, mainClass: String, mem: Int) + case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage case class KillDriver(driverId: String) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 52f6b1b554..32ff6db8f0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -20,7 +20,11 @@ package org.apache.spark.deploy private[spark] class DriverDescription( val jarUrl: String, val mainClass: String, - val mem: Integer) // TODO: Should this be Long? + val mem: Int, + val cores: Int, + val options: Seq[String], + val javaOptions: Seq[String], + val envVars: Seq[(String, String)]) extends Serializable { override def toString: String = s"DriverDescription ($mainClass)" diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 482bafd0e0..dd62172103 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -34,7 +34,7 @@ import scala.concurrent.Await import akka.actor.Actor.emptyBehavior /** - * Parent class for actors that to send a single message to the standalone master and then die. + * Actor that sends a single message to the standalone master and then shuts down. */ private[spark] abstract class SingleMessageClient( actorSystem: ActorSystem, master: String, message: DeployMessage) @@ -94,34 +94,31 @@ private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, } /** - * Callable utility for starting and terminating drivers inside of the standalone scheduler. + * Executable utility for starting and terminating drivers inside of a standalone cluster. */ object DriverClient { def main(args: Array[String]) { - if (args.size < 3) { - println("usage: DriverClient launch <active-master> <jar-url> <main-class>") - println("usage: DriverClient kill <active-master> <driver-id>") - System.exit(-1) - } + val driverArgs = new DriverClientArguments(args) val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - "driverSubmission", Utils.localHostName(), 0) - - // TODO Should be configurable - val mem = 512 + "driverClient", Utils.localHostName(), 0) - args(0) match { + driverArgs.cmd match { case "launch" => - val master = args(1) - val jarUrl = args(2) - val mainClass = args(3) - val driverDescription = new DriverDescription(jarUrl, mainClass, mem) - val client = new SubmissionClient(actorSystem, master, driverDescription) + val driverDescription = new DriverDescription( + driverArgs.jarUrl, + driverArgs.mainClass, + driverArgs.memory, + driverArgs.cores, + driverArgs.driverOptions, + driverArgs.driverJavaOptions, + driverArgs.driverEnvVars) + val client = new SubmissionClient(actorSystem, driverArgs.master, driverDescription) case "kill" => - val master = args(1) - val driverId = args(2) + val master = driverArgs.master + val driverId = driverArgs.driverId val client = new TerminationClient(actorSystem, master, driverId) } actorSystem.awaitTermination() diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala new file mode 100644 index 0000000000..618467ce8c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.client + +import scala.collection.mutable.ListBuffer + +/** + * Command-line parser for the driver client. + */ +private[spark] class DriverClientArguments(args: Array[String]) { + var cmd: String = "" // 'launch' or 'kill' + + // launch parameters + var master: String = "" + var jarUrl: String = "" + var mainClass: String = "" + var memory: Int = 512 + var cores: Int = 1 + private var _driverOptions = ListBuffer[String]() + private var _driverJavaOptions = ListBuffer[String]() + private var _driverEnvVars = ListBuffer[(String, String)]() + def driverOptions = _driverOptions.toSeq + def driverJavaOptions = _driverJavaOptions.toSeq + def driverEnvVars = _driverEnvVars.toSeq + + // kill parameters + var driverId: String = "" + + parse(args.toList) + + def parse(args: List[String]): Unit = args match { + case ("--cores" | "-c") :: value :: tail => + cores = value.toInt + parse(tail) + + case ("--memory" | "-m") :: value :: tail => + memory = value.toInt + parse(tail) + + case ("--java-option" | "-j") :: value :: tail => + _driverJavaOptions += value + parse(tail) + + case ("--environment-variable" | "-e") :: value :: tail => + val parts = value.split("=") + _driverEnvVars += ((parts(0), parts(1))) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case "launch" :: _master :: _jarUrl :: _mainClass :: tail => + cmd = "launch" + master = _master + jarUrl = _jarUrl + mainClass = _mainClass + _driverOptions ++= tail + + case "kill" :: _master :: _driverId :: tail => + cmd = "kill" + master = _master + driverId = _driverId + + case _ => + printUsageAndExit(1) + } + + /** + * Print usage and exit JVM with the given exit code. + */ + def printUsageAndExit(exitCode: Int) { + System.err.println( + "usage: DriverClient launch [options] <active-master> <jar-url> <main-class> " + + "[driver options]\n" + + "usage: DriverClient kill <active-master> <driver-id>\n\n" + + "Options:\n" + + " -c CORES, --cores CORES Number of cores to request \n" + + " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" + + " -j JAVA_OPT, --java-option JAVA_OPT Java option to pass to driver\n" + + " -e K=V, --environment-variable K=V Environment variable to pass to driver\n") + System.exit(exitCode) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 76af332986..9bfacfc999 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -627,8 +627,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) - worker.actor ! LaunchDriver(driver.id, driver.desc.jarUrl, driver.desc.mainClass, - driver.desc.mem) + worker.actor ! LaunchDriver(driver.id, driver.desc) driver.state = DriverState.RUNNING } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index fccc36b660..41a089ad33 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -29,16 +29,15 @@ import org.apache.hadoop.conf.Configuration import akka.actor.{ActorRef, ActorSelection} import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState +import org.apache.spark.deploy.DriverDescription /** * Manages the execution of one driver process. */ private[spark] class DriverRunner( val driverId: String, - val jarUrl: String, - val mainClass: String, val workDir: File, - val memory: Int, + val driverDesc: DriverDescription, val worker: ActorRef) extends Logging { @@ -54,8 +53,9 @@ private[spark] class DriverRunner( try { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) - val command = Seq("java", "-cp", localJarFilename, mainClass) - runCommandWithRetry(command, driverDir) + val command = Seq("java") ++ driverDesc.javaOptions ++ Seq("-cp", localJarFilename) ++ + Seq(driverDesc.mainClass) ++ driverDesc.options + runCommandWithRetry(command, driverDesc.envVars, driverDir) } catch { case e: Exception => exn = Some(e) @@ -110,7 +110,7 @@ private[spark] class DriverRunner( */ def downloadUserJar(driverDir: File): String = { - val jarPath = new Path(jarUrl) + val jarPath = new Path(driverDesc.jarUrl) val emptyConf = new Configuration() // TODO: In docs explain it needs to be full HDFS path val jarFileSystem = jarPath.getFileSystem(emptyConf) @@ -134,17 +134,17 @@ private[spark] class DriverRunner( } /** Continue launching the supplied command until it exits zero. */ - def runCommandWithRetry(command: Seq[String], baseDir: File) = { - /* Time to wait between submission retries. */ + def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = { + // Time to wait between submission retries. var waitSeconds = 1 - // TODO: We should distinguish between "immediate" exits and cases where it was running - // for a long time and then exits. var cleanExit = false while (!cleanExit && !killed) { Thread.sleep(waitSeconds * 1000) + + logInfo("Launch Command: " + command.mkString("\"", "\" \"", "\"")) val builder = new ProcessBuilder(command: _*).directory(baseDir) - logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) + envVars.map{ case(k,v) => builder.environment().put(k, v) } process = Some(builder.start()) @@ -153,12 +153,11 @@ private[spark] class DriverRunner( redirectStream(process.get.getInputStream, stdout) val stderr = new File(baseDir, "stderr") - val header = "Driver Command: %s\n%s\n\n".format( + val header = "Launch Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) Files.write(header, stderr, Charsets.UTF_8) redirectStream(process.get.getErrorStream, stderr) - val exitCode = /* There is a race here I've elected to ignore for now because it's very unlikely and not * simple to fix. This could see `killed=false` then the main thread gets a kill request diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index a2b491a72f..dd6783a344 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -242,9 +242,9 @@ private[spark] class Worker( } } - case LaunchDriver(driverId, jarUrl, mainClass, memory) => { + case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") - val driver = new DriverRunner(driverId, jarUrl, mainClass, workDir, memory, self) + val driver = new DriverRunner(driverId, workDir, driverDesc, self) drivers(driverId) = driver driver.start() @@ -278,7 +278,7 @@ private[spark] class Worker( master ! DriverStateChanged(driverId, state, exception) } val driver = drivers(driverId) - memoryUsed -= driver.memory + memoryUsed -= driver.driverDesc.mem coresUsed -= 1 drivers -= driverId finishedDrivers(driverId) = driver diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index e233b82585..2c37b7184d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -137,9 +137,9 @@ private[spark] class IndexPage(parent: WorkerWebUI) { def driverRow(driver: DriverRunner): Seq[Node] = { <tr> <td>{driver.driverId}</td> - <td>{driver.mainClass}</td> - <td sorttable_customkey={driver.memory.toString}> - {Utils.megabytesToString(driver.memory)} + <td>{driver.driverDesc.mainClass}</td> + <td sorttable_customkey={driver.driverDesc.mem.toString}> + {Utils.megabytesToString(driver.driverDesc.mem)} </td> <td> <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a> diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index d128e58797..2fd862c4c4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -82,6 +82,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I s"${workDir.getPath}/$appId/$executorId/$logType" case (None, None, Some(d)) => s"${workDir.getPath}/$driverId/$logType" + case _ => + throw new Exception("Request must specify either application or driver identifiers") } val (startByte, endByte) = getByteRange(path, offset, byteLength) @@ -106,6 +108,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") case (None, None, Some(d)) => (s"${workDir.getPath}/$d/$logType", s"driverId=$d") + case _ => + throw new Exception("Request must specify either application or driver identifiers") } val (startByte, endByte) = getByteRange(path, offset, byteLength) diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala new file mode 100644 index 0000000000..9055ce7a39 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import scala.collection.JavaConversions._ + +/** Prints out environmental information, sleeps, and then exits. Made to + * test driver submission in the standalone scheduler. */ +object DriverSubmissionTest { + def main(args: Array[String]) { + if (args.size < 1) { + println("Usage: DriverSubmissionTest <seconds-to-sleep>") + System.exit(0) + } + val numSecondsToSleep = args(0).toInt + + val env = System.getenv() + val properties = System.getProperties() + + println("Environment variables containing SPARK_TEST:") + env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println) + + println("System properties containing spark.test:") + properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) + + for (i <- 1 until numSecondsToSleep) { + Thread.sleep(1000) + } + } +} |