diff options
author | CodingCat <zhunansjtu@gmail.com> | 2014-04-24 15:55:18 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2014-04-24 15:55:18 -0700 |
commit | f99af8529b6969986f0c3e03f6ff9b7bb9d53ece (patch) | |
tree | ff310067b48c078b5abe3a5f6df37a08b294dec8 | |
parent | a03ac222d84025a1036750e1179136a13f75dea7 (diff) | |
download | spark-f99af8529b6969986f0c3e03f6ff9b7bb9d53ece.tar.gz spark-f99af8529b6969986f0c3e03f6ff9b7bb9d53ece.tar.bz2 spark-f99af8529b6969986f0c3e03f6ff9b7bb9d53ece.zip |
SPARK-1104: kill Process in workerThread of ExecutorRunner
As reported in https://spark-project.atlassian.net/browse/SPARK-1104
By @pwendell: "Sometimes due to large shuffles executors will take a long time shutting down. In particular this can happen if large numbers of shuffle files are around (this will be alleviated by SPARK-1103, but nonetheless...).
The symptom is you have DEAD workers sitting around in the UI and the existing workers keep trying to re-register but can't because they've been assumed dead."
In this patch, I add lines in the handler of InterruptedException in workerThread of executorRunner, so that the process.destroy() and process.waitFor() can only block the workerThread instead of blocking the worker Actor...
---------
analysis: process.destroy() is a blocking method, i.e. it only returns when all shutdownHook threads return...so calling it in Worker thread will make Worker block for a long while....
about what will happen on the shutdown hooks when the JVM process is killed: http://www.tutorialspoint.com/java/lang/runtime_addshutdownhook.htm
Author: CodingCat <zhunansjtu@gmail.com>
Closes #35 from CodingCat/SPARK-1104 and squashes the following commits:
85767da [CodingCat] add null checking and remove unnecessary killProce
3107aeb [CodingCat] address Aaron's comments
eb615ba [CodingCat] kill the process when the error happens
0accf2f [CodingCat] set process to null after killed it
1d511c8 [CodingCat] kill Process in workerThread
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 31 |
1 files changed, 14 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index f94cd685e8..2051403682 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -58,30 +58,29 @@ private[spark] class ExecutorRunner( override def run() { fetchAndRunExecutor() } } workerThread.start() - // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { - if (process != null) { - logInfo("Shutdown hook killing child process.") - process.destroy() - process.waitFor() - } + killProcess() } } Runtime.getRuntime.addShutdownHook(shutdownHook) } + private def killProcess() { + if (process != null) { + logInfo("Killing process!") + process.destroy() + process.waitFor() + } + } + /** Stop this executor runner, including killing the process it launched */ def kill() { if (workerThread != null) { + // the workerThread will kill the child process when interrupted workerThread.interrupt() workerThread = null - if (process != null) { - logInfo("Killing process!") - process.destroy() - process.waitFor() - } state = ExecutorState.KILLED worker ! ExecutorStateChanged(appId, execId, state, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) @@ -128,7 +127,6 @@ private[spark] class ExecutorRunner( // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() - val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) @@ -148,14 +146,13 @@ private[spark] class ExecutorRunner( val message = "Command exited with code " + exitCode worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)) } catch { - case interrupted: InterruptedException => + case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") - + killProcess() + } case e: Exception => { logError("Error running executor", e) - if (process != null) { - process.destroy() - } + killProcess() state = ExecutorState.FAILED val message = e.getClass + ": " + e.getMessage worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) |