aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-12-26 12:02:19 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-12-26 12:02:19 -0800
commit5938cfc153ac79148f288a1f228458a5df0c74dc (patch)
treeb0b0b5b0532b9bf9d4acd36160164c57c2b37c71 /core/src/main/scala/org/apache
parentbbc362833b3bc34014a13be0592deca39cfd88bd (diff)
downloadspark-5938cfc153ac79148f288a1f228458a5df0c74dc.tar.gz
spark-5938cfc153ac79148f288a1f228458a5df0c74dc.tar.bz2
spark-5938cfc153ac79148f288a1f228458a5df0c74dc.zip
Updated approach to driver restarting
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala8
2 files changed, 30 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b9b6341b75..d13e6519bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -41,7 +41,7 @@ private[spark] class DriverRunner(
val worker: ActorRef)
extends Logging {
- var process: Option[Process] = None
+ @volatile var process: Option[Process] = None
@volatile var killed = false
/** Starts a thread to run and manage the driver. */
@@ -73,8 +73,10 @@ private[spark] class DriverRunner(
/** Terminate this driver (or prevent it from ever starting if not yet started) */
def kill() {
- killed = true
- process.foreach(p => p.destroy())
+ synchronized {
+ process.foreach(p => p.destroy())
+ killed = true
+ }
}
/** Spawn a thread that will redirect a given stream to a file */
@@ -134,43 +136,40 @@ private[spark] class DriverRunner(
}
/** Continue launching the supplied command until it exits zero or is killed. */
- def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = {
+ def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File)
+ : Unit = {
// Time to wait between submission retries.
var waitSeconds = 1
var cleanExit = false
while (!cleanExit && !killed) {
- Thread.sleep(waitSeconds * 1000)
-
logInfo("Launch Command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(baseDir)
envVars.map{ case(k,v) => builder.environment().put(k, v) }
- process = Some(builder.start())
+ synchronized {
+ if (killed) { return }
- // Redirect stdout and stderr to files
- val stdout = new File(baseDir, "stdout")
- redirectStream(process.get.getInputStream, stdout)
+ process = Some(builder.start())
- val stderr = new File(baseDir, "stderr")
- val header = "Launch Command: %s\n%s\n\n".format(
- command.mkString("\"", "\" \"", "\""), "=" * 40)
- Files.write(header, stderr, Charsets.UTF_8)
- redirectStream(process.get.getErrorStream, stderr)
+ // Redirect stdout and stderr to files
+ val stdout = new File(baseDir, "stdout")
+ redirectStream(process.get.getInputStream, stdout)
+
+ val stderr = new File(baseDir, "stderr")
+ val header = "Launch Command: %s\n%s\n\n".format(
+ command.mkString("\"", "\" \"", "\""), "=" * 40)
+ Files.write(header, stderr, Charsets.UTF_8)
+ redirectStream(process.get.getErrorStream, stderr)
+ }
- val exitCode =
- /* There is a race here I've elected to ignore for now because it's very unlikely and not
- * simple to fix. This could see `killed=false` then the main thread gets a kill request
- * and sets `killed=true` and destroys the not-yet-started process, then this thread
- * launches the process. For now, in that case the user can just re-submit the kill
- * request. */
- if (killed) -1
- else process.get.waitFor()
+ val exitCode = process.get.waitFor()
cleanExit = exitCode == 0
if (!cleanExit && !killed) {
waitSeconds = waitSeconds * 2 // exponential back-off
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
+ (0 until waitSeconds).takeWhile(f => {Thread.sleep(1000); !killed})
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
new file mode 100644
index 0000000000..92fb0843f8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -0,0 +1,8 @@
+package org.apache.spark.deploy.worker
+
+object DriverWrapper {
+ def main(args: Array[String]) {
+ val c = Console.readChar()
+ println(s"Char: $c")
+ }
+}