diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-29 11:26:56 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-29 11:26:56 -0800 |
commit | 6ffa9bb226ac9ceec4a34f0011c35d2d9710f8f8 (patch) | |
tree | dfb8e28a46701b6b1af03437dcc2b3ef2ecb83d3 /core/src | |
parent | 35f6dc252a8961189837e79914f305d0745a8792 (diff) | |
download | spark-6ffa9bb226ac9ceec4a34f0011c35d2d9710f8f8.tar.gz spark-6ffa9bb226ac9ceec4a34f0011c35d2d9710f8f8.tar.bz2 spark-6ffa9bb226ac9ceec4a34f0011c35d2d9710f8f8.zip |
Documentation and adding supervise option
Diffstat (limited to 'core/src')
4 files changed, 18 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index aba81ec27c..58c95dc4f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -21,6 +21,7 @@ private[spark] class DriverDescription( val jarUrl: String, val mem: Int, val cores: Int, + val supervise: Boolean, val command: Command) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 7e75563e60..6257303830 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -71,7 +71,7 @@ object DriverClient extends Logging { driverArgs.cmd match { case "launch" => // TODO: Could modify env here to pass a flag indicating Spark is in deploy-driver mode - // then use that to load jars locally + // then use that to load jars locally (e.g. truncate the filesystem path) val env = Map[String, String]() System.getenv().foreach{case (k, v) => env(k) = v} @@ -83,6 +83,7 @@ object DriverClient extends Logging { driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, + driverArgs.supervise, command) driver ! RequestSubmitDriver(driverDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index 3875838319..6a15422c6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -29,6 +29,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { var master: String = "" var jarUrl: String = "" var mainClass: String = "" + var supervise: Boolean = false var memory: Int = 512 var cores: Int = 1 private var _driverOptions = ListBuffer[String]() @@ -48,6 +49,10 @@ private[spark] class DriverClientArguments(args: Array[String]) { memory = value.toInt parse(tail) + case ("--supervise" | "-s") :: tail => + supervise = true + parse(tail) + case ("--help" | "-h") :: tail => printUsageAndExit(0) @@ -71,10 +76,6 @@ private[spark] class DriverClientArguments(args: Array[String]) { * Print usage and exit JVM with the given exit code. */ def printUsageAndExit(exitCode: Int) { - // TODO: Document the submission approach here. It is: - // 1) Create an uber jar with your application and dependencies (excluding Spark) - // 2) You'll need to add this jar using addJar(X) inside of your spark context - // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars // separately similar to in the YARN client. System.err.println( @@ -83,7 +84,8 @@ private[spark] class DriverClientArguments(args: Array[String]) { "usage: DriverClient kill <active-master> <driver-id>\n\n" + "Options:\n" + " -c CORES, --cores CORES Number of cores to request \n" + - " -m MEMORY, --memory MEMORY Megabytes of memory to request\n") + " -m MEMORY, --memory MEMORY Megabytes of memory to request\n" + + " -s, --supervise Whether to restart the driver on failure\n") System.exit(exitCode) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 8950fb71a1..41500bb28d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -66,7 +66,7 @@ private[spark] class DriverRunner( val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, sparkHome.getAbsolutePath) - runCommandWithRetry(command, env, driverDir) + runCommand(command, env, driverDir, driverDesc.supervise) } catch { case e: Exception => exn = Some(e) @@ -137,13 +137,14 @@ private[spark] class DriverRunner( localJarFilename } - /** Continue launching the supplied command until it exits zero or is killed. */ - private def runCommandWithRetry(command: Seq[String], envVars: Map[String, String], baseDir: File) { + /** Launch the supplied command. */ + private def runCommand(command: Seq[String], envVars: Map[String, String], baseDir: File, + supervise: Boolean) { // Time to wait between submission retries. var waitSeconds = 1 - var cleanExit = false + var keepTrying = !killed - while (!cleanExit && !killed) { + while (keepTrying) { logInfo("Launch Command: " + command.mkString("\"", "\" \"", "\"")) val builder = new ProcessBuilder(command: _*).directory(baseDir) envVars.map{ case(k,v) => builder.environment().put(k, v) } @@ -166,8 +167,8 @@ private[spark] class DriverRunner( val exitCode = process.get.waitFor() - cleanExit = exitCode == 0 - if (!cleanExit && !killed) { + keepTrying = supervise && exitCode != 0 && !killed + if (keepTrying) { waitSeconds = waitSeconds * 2 // exponential back-off logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") (0 until waitSeconds).takeWhile(f => {Thread.sleep(1000); !killed}) |