aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-13 22:06:39 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-13 22:06:39 -0800
commit2aecda284e22ec608992b6221e2f5ffbd51fcd24 (patch)
tree4a317924d8614d5297dce9b35344581afca34b54
parent8af2f8c61ae4a59d129fb3530d0f6e9317f4bff8 (diff)
downloadspark-2aecda284e22ec608992b6221e2f5ffbd51fcd24.tar.gz
spark-2aecda284e22ec608992b6221e2f5ffbd51fcd24.tar.bz2
spark-2aecda284e22ec608992b6221e2f5ffbd51fcd24.zip
[SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the shutdown hook
1. Make sure workers and masters exit so that no worker or master will still be running when triggering the shutdown hook. 2. Set ExecutorState to FAILED if it's still RUNNING when executing the shutdown hook. This should fix the potential exceptions when exiting a local cluster ``` java.lang.AssertionError: assertion failed: executor 4 state transfer from RUNNING to RUNNING is illegal at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.lang.IllegalStateException: Shutdown hooks cannot be modified during shutdown. at org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246) at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191) at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180) at org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73) at org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474) at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` Author: Shixiong Zhu <shixiong@databricks.com> Closes #10269 from zsxwing/executor-state.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala5
3 files changed, 9 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 83ccaadfe7..5bb62d37d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -75,6 +75,8 @@ class LocalSparkCluster(
// Stop the workers before the master so they don't get upset that it disconnected
workerRpcEnvs.foreach(_.shutdown())
masterRpcEnvs.foreach(_.shutdown())
+ workerRpcEnvs.foreach(_.awaitTermination())
+ masterRpcEnvs.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 04b20e0d6a..1355e1ad1b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -257,9 +257,8 @@ private[deploy] class Master(
exec.state = state
if (state == ExecutorState.RUNNING) {
- if (oldState != ExecutorState.LAUNCHING) {
- logWarning(s"Executor $execId state transfer from $oldState to RUNNING is unexpected")
- }
+ assert(oldState == ExecutorState.LAUNCHING,
+ s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 25a17473e4..9a42487bb3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -71,6 +71,11 @@ private[deploy] class ExecutorRunner(
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
+ // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
+ // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
+ if (state == ExecutorState.RUNNING) {
+ state = ExecutorState.FAILED
+ }
killProcess(Some("Worker shutting down")) }
}