aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-04-24 15:55:18 -0700
committerAaron Davidson <aaron@databricks.com>2014-04-24 15:55:18 -0700
commitf99af8529b6969986f0c3e03f6ff9b7bb9d53ece (patch)
treeff310067b48c078b5abe3a5f6df37a08b294dec8 /core
parenta03ac222d84025a1036750e1179136a13f75dea7 (diff)
downloadspark-f99af8529b6969986f0c3e03f6ff9b7bb9d53ece.tar.gz
spark-f99af8529b6969986f0c3e03f6ff9b7bb9d53ece.tar.bz2
spark-f99af8529b6969986f0c3e03f6ff9b7bb9d53ece.zip
SPARK-1104: kill Process in workerThread of ExecutorRunner
As reported in https://spark-project.atlassian.net/browse/SPARK-1104 By @pwendell: "Sometimes due to large shuffles executors will take a long time shutting down. In particular this can happen if large numbers of shuffle files are around (this will be alleviated by SPARK-1103, but nonetheless...). The symptom is you have DEAD workers sitting around in the UI and the existing workers keep trying to re-register but can't because they've been assumed dead." In this patch, I add lines in the handler of InterruptedException in workerThread of executorRunner, so that the process.destroy() and process.waitFor() can only block the workerThread instead of blocking the worker Actor... --------- analysis: process.destroy() is a blocking method, i.e. it only returns when all shutdownHook threads return...so calling it in Worker thread will make Worker block for a long while.... about what will happen on the shutdown hooks when the JVM process is killed: http://www.tutorialspoint.com/java/lang/runtime_addshutdownhook.htm Author: CodingCat <zhunansjtu@gmail.com> Closes #35 from CodingCat/SPARK-1104 and squashes the following commits: 85767da [CodingCat] add null checking and remove unnecessary killProce 3107aeb [CodingCat] address Aaron's comments eb615ba [CodingCat] kill the process when the error happens 0accf2f [CodingCat] set process to null after killed it 1d511c8 [CodingCat] kill Process in workerThread
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala31
1 files changed, 14 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index f94cd685e8..2051403682 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -58,30 +58,29 @@ private[spark] class ExecutorRunner(
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
-
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
- if (process != null) {
- logInfo("Shutdown hook killing child process.")
- process.destroy()
- process.waitFor()
- }
+ killProcess()
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
+ private def killProcess() {
+ if (process != null) {
+ logInfo("Killing process!")
+ process.destroy()
+ process.waitFor()
+ }
+ }
+
/** Stop this executor runner, including killing the process it launched */
def kill() {
if (workerThread != null) {
+ // the workerThread will kill the child process when interrupted
workerThread.interrupt()
workerThread = null
- if (process != null) {
- logInfo("Killing process!")
- process.destroy()
- process.waitFor()
- }
state = ExecutorState.KILLED
worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
@@ -128,7 +127,6 @@ private[spark] class ExecutorRunner(
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
-
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
@@ -148,14 +146,13 @@ private[spark] class ExecutorRunner(
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
- case interrupted: InterruptedException =>
+ case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
-
+ killProcess()
+ }
case e: Exception => {
logError("Error running executor", e)
- if (process != null) {
- process.destroy()
- }
+ killProcess()
state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)