aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala31
1 files changed, 14 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f94cd685e8..2051403682 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -58,30 +58,29 @@ private[spark] class ExecutorRunner(
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
-
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
- if (process != null) {
- logInfo("Shutdown hook killing child process.")
- process.destroy()
- process.waitFor()
- }
+ killProcess()
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
+ private def killProcess() {
+ if (process != null) {
+ logInfo("Killing process!")
+ process.destroy()
+ process.waitFor()
+ }
+ }
+
/** Stop this executor runner, including killing the process it launched */
def kill() {
if (workerThread != null) {
+ // the workerThread will kill the child process when interrupted
workerThread.interrupt()
workerThread = null
- if (process != null) {
- logInfo("Killing process!")
- process.destroy()
- process.waitFor()
- }
state = ExecutorState.KILLED
worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
@@ -128,7 +127,6 @@ private[spark] class ExecutorRunner(
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
-
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
@@ -148,14 +146,13 @@ private[spark] class ExecutorRunner(
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
- case interrupted: InterruptedException =>
+ case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
-
+ killProcess()
+ }
case e: Exception => {
logError("Error running executor", e)
- if (process != null) {
- process.destroy()
- }
+ killProcess()
state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)