diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala | 45 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala | 8 |
2 files changed, 30 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index b9b6341b75..d13e6519bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -41,7 +41,7 @@ private[spark] class DriverRunner( val worker: ActorRef) extends Logging { - var process: Option[Process] = None + @volatile var process: Option[Process] = None @volatile var killed = false /** Starts a thread to run and manage the driver. */ @@ -73,8 +73,10 @@ private[spark] class DriverRunner( /** Terminate this driver (or prevent it from ever starting if not yet started) */ def kill() { - killed = true - process.foreach(p => p.destroy()) + synchronized { + process.foreach(p => p.destroy()) + killed = true + } } /** Spawn a thread that will redirect a given stream to a file */ @@ -134,43 +136,40 @@ private[spark] class DriverRunner( } /** Continue launching the supplied command until it exits zero or is killed. */ - def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = { + def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) + : Unit = { // Time to wait between submission retries. var waitSeconds = 1 var cleanExit = false while (!cleanExit && !killed) { - Thread.sleep(waitSeconds * 1000) - logInfo("Launch Command: " + command.mkString("\"", "\" \"", "\"")) val builder = new ProcessBuilder(command: _*).directory(baseDir) envVars.map{ case(k,v) => builder.environment().put(k, v) } - process = Some(builder.start()) + synchronized { + if (killed) { return } - // Redirect stdout and stderr to files - val stdout = new File(baseDir, "stdout") - redirectStream(process.get.getInputStream, stdout) + process = Some(builder.start()) - val stderr = new File(baseDir, "stderr") - val header = "Launch Command: %s\n%s\n\n".format( - command.mkString("\"", "\" \"", "\""), "=" * 40) - Files.write(header, stderr, Charsets.UTF_8) - redirectStream(process.get.getErrorStream, stderr) + // Redirect stdout and stderr to files + val stdout = new File(baseDir, "stdout") + redirectStream(process.get.getInputStream, stdout) + + val stderr = new File(baseDir, "stderr") + val header = "Launch Command: %s\n%s\n\n".format( + command.mkString("\"", "\" \"", "\""), "=" * 40) + Files.write(header, stderr, Charsets.UTF_8) + redirectStream(process.get.getErrorStream, stderr) + } - val exitCode = - /* There is a race here I've elected to ignore for now because it's very unlikely and not - * simple to fix. This could see `killed=false` then the main thread gets a kill request - * and sets `killed=true` and destroys the not-yet-started process, then this thread - * launches the process. For now, in that case the user can just re-submit the kill - * request. */ - if (killed) -1 - else process.get.waitFor() + val exitCode = process.get.waitFor() cleanExit = exitCode == 0 if (!cleanExit && !killed) { waitSeconds = waitSeconds * 2 // exponential back-off logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") + (0 until waitSeconds).takeWhile(f => {Thread.sleep(1000); !killed}) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala new file mode 100644 index 0000000000..92fb0843f8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -0,0 +1,8 @@ +package org.apache.spark.deploy.worker + +object DriverWrapper { + def main(args: Array[String]) { + val c = Console.readChar() + println(s"Char: $c") + } +} |