From 6ffa9bb226ac9ceec4a34f0011c35d2d9710f8f8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 29 Dec 2013 11:26:56 -0800 Subject: Documentation and adding supervise option --- .../apache/spark/deploy/DriverDescription.scala | 1 + .../apache/spark/deploy/client/DriverClient.scala | 3 +- .../deploy/client/DriverClientArguments.scala | 12 ++++--- .../apache/spark/deploy/worker/DriverRunner.scala | 15 +++++---- docs/spark-standalone.md | 38 +++++++++++++++++++--- 5 files changed, 51 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index aba81ec27c..58c95dc4f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -21,6 +21,7 @@ private[spark] class DriverDescription( val jarUrl: String, val mem: Int, val cores: Int, + val supervise: Boolean, val command: Command) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 7e75563e60..6257303830 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -71,7 +71,7 @@ object DriverClient extends Logging { driverArgs.cmd match { case "launch" => // TODO: Could modify env here to pass a flag indicating Spark is in deploy-driver mode - // then use that to load jars locally + // then use that to load jars locally (e.g. truncate the filesystem path) val env = Map[String, String]() System.getenv().foreach{case (k, v) => env(k) = v} @@ -83,6 +83,7 @@ object DriverClient extends Logging { driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, + driverArgs.supervise, command) driver ! RequestSubmitDriver(driverDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index 3875838319..6a15422c6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -29,6 +29,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { var master: String = "" var jarUrl: String = "" var mainClass: String = "" + var supervise: Boolean = false var memory: Int = 512 var cores: Int = 1 private var _driverOptions = ListBuffer[String]() @@ -48,6 +49,10 @@ private[spark] class DriverClientArguments(args: Array[String]) { memory = value.toInt parse(tail) + case ("--supervise" | "-s") :: tail => + supervise = true + parse(tail) + case ("--help" | "-h") :: tail => printUsageAndExit(0) @@ -71,10 +76,6 @@ private[spark] class DriverClientArguments(args: Array[String]) { * Print usage and exit JVM with the given exit code. */ def printUsageAndExit(exitCode: Int) { - // TODO: Document the submission approach here. It is: - // 1) Create an uber jar with your application and dependencies (excluding Spark) - // 2) You'll need to add this jar using addJar(X) inside of your spark context - // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars // separately similar to in the YARN client. System.err.println( @@ -83,7 +84,8 @@ private[spark] class DriverClientArguments(args: Array[String]) { "usage: DriverClient kill \n\n" + "Options:\n" + " -c CORES, --cores CORES Number of cores to request \n" + - " -m MEMORY, --memory MEMORY Megabytes of memory to request\n") + " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" + + " -s, --supervise Whether to restart the driver on failure\n") System.exit(exitCode) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 8950fb71a1..41500bb28d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -66,7 +66,7 @@ private[spark] class DriverRunner( val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, sparkHome.getAbsolutePath) - runCommandWithRetry(command, env, driverDir) + runCommand(command, env, driverDir, driverDesc.supervise) } catch { case e: Exception => exn = Some(e) @@ -137,13 +137,14 @@ private[spark] class DriverRunner( localJarFilename } - /** Continue launching the supplied command until it exits zero or is killed. */ - private def runCommandWithRetry(command: Seq[String], envVars: Map[String, String], baseDir: File) { + /** Launch the supplied command. */ + private def runCommand(command: Seq[String], envVars: Map[String, String], baseDir: File, + supervise: Boolean) { // Time to wait between submission retries. var waitSeconds = 1 - var cleanExit = false + var keepTrying = !killed - while (!cleanExit && !killed) { + while (keepTrying) { logInfo("Launch Command: " + command.mkString("\"", "\" \"", "\"")) val builder = new ProcessBuilder(command: _*).directory(baseDir) envVars.map{ case(k,v) => builder.environment().put(k, v) } @@ -166,8 +167,8 @@ private[spark] class DriverRunner( val exitCode = process.get.waitFor() - cleanExit = exitCode == 0 - if (!cleanExit && !killed) { + keepTrying = supervise && exitCode != 0 && !killed + if (keepTrying) { waitSeconds = waitSeconds * 2 // exponential back-off logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") (0 until waitSeconds).takeWhile(f => {Thread.sleep(1000); !killed}) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index b822265b5a..59adbce156 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -10,11 +10,7 @@ In addition to running on the Mesos or YARN cluster managers, Spark also provide # Installing Spark Standalone to a Cluster -The easiest way to deploy Spark is by running the `./make-distribution.sh` script to create a binary distribution. -This distribution can be deployed to any machine with the Java runtime installed; there is no need to install Scala. - -The recommended procedure is to deploy and start the master on one node first, get the master spark URL, -then modify `conf/spark-env.sh` in the `dist/` directory before deploying to all the other nodes. +To install Spark Standlone mode, you simply place a compiled version of Spark on each node on the cluster. You can obtain pre-built versions of Spark with each release or [build it yourself](index.html#building). # Starting a Cluster Manually @@ -150,6 +146,38 @@ automatically set MASTER from the `SPARK_MASTER_IP` and `SPARK_MASTER_PORT` vari You can also pass an option `-c ` to control the number of cores that spark-shell uses on the cluster. +# Launching Applications Inside the Cluster + +You may also run your application entirely inside of the cluster by submitting your application driver using the submission client. The syntax for submitting applications is as follows: + + + ./spark-class org.apache.spark.deploy.client.DriverClient launch + [client-options] \ + \ + [application-options] + + cluster-url: The URL of the master node. + application-jar-url: Path to a bundled jar including your application and all dependencies. + Accepts hdfs://, file://, and http:// paths. + main-class: The entry point for your application. + + Client Options: + --memory (amount of memory, in MB, allocated for your driver program) + --cores (number of cores allocated for your driver program) + --supervise (whether to automatically restart your driver on application or node failure) + +Keep in mind that your driver program will be executed on a remote worker machine. You can control the execution environment in the following ways: + + * _Environment variables_: These will be captured from the environment in which you launch the client and applied when launching the driver program. + * _Java options_: You can add java options by setting `SPARK_JAVA_OPTS` in the environment in which you launch the submission client. + * _Dependencies_: You'll still need to call `sc.addJar` inside of your driver program to add your application jar and any dependencies. If you submit a local application jar to the client (e.g one with a `file://` URL), it will be uploaded into the working directory of your driver program. Then, you can add it using `sc.addJar("jar-name.jar")`. + +Once you submit a driver program, it will appear in the cluster management UI at port 8080 and +be assigned an identifier. If you'd like to prematurely terminate the program, you can do so using +the same client: + + ./spark-class org.apache.spark.deploy.client.DriverClient kill + # Resource Scheduling The standalone cluster mode currently only supports a simple FIFO scheduler across applications. -- cgit v1.2.3