diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 31 |
1 files changed, 14 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index f94cd685e8..2051403682 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -58,30 +58,29 @@ private[spark] class ExecutorRunner( override def run() { fetchAndRunExecutor() } } workerThread.start() - // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { - if (process != null) { - logInfo("Shutdown hook killing child process.") - process.destroy() - process.waitFor() - } + killProcess() } } Runtime.getRuntime.addShutdownHook(shutdownHook) } + private def killProcess() { + if (process != null) { + logInfo("Killing process!") + process.destroy() + process.waitFor() + } + } + /** Stop this executor runner, including killing the process it launched */ def kill() { if (workerThread != null) { + // the workerThread will kill the child process when interrupted workerThread.interrupt() workerThread = null - if (process != null) { - logInfo("Killing process!") - process.destroy() - process.waitFor() - } state = ExecutorState.KILLED worker ! ExecutorStateChanged(appId, execId, state, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) @@ -128,7 +127,6 @@ private[spark] class ExecutorRunner( // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() - val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) @@ -148,14 +146,13 @@ private[spark] class ExecutorRunner( val message = "Command exited with code " + exitCode worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)) } catch { - case interrupted: InterruptedException => + case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") - + killProcess() + } case e: Exception => { logError("Error running executor", e) - if (process != null) { - process.destroy() - } + killProcess() state = ExecutorState.FAILED val message = e.getClass + ": " + e.getMessage worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) |