diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2014-12-03 15:08:01 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-03 15:08:01 -0800 |
commit | 96b27855c5f9789d1f15316564a8e0fa2cd5a51b (patch) | |
tree | ae9174d2013f6d83dcc4b38f13a742df482b9fda /core | |
parent | 513ef82e85661552e596d0b483b645ac24e86d4d (diff) | |
download | spark-96b27855c5f9789d1f15316564a8e0fa2cd5a51b.tar.gz spark-96b27855c5f9789d1f15316564a8e0fa2cd5a51b.tar.bz2 spark-96b27855c5f9789d1f15316564a8e0fa2cd5a51b.zip |
[SPARK-4498][core] Don't transition ExecutorInfo to RUNNING until Driver adds Executor
The ExecutorInfo only reaches the RUNNING state if the Driver is alive to send the ExecutorStateChanged message to master. Else, appInfo.resetRetryCount() is never called and failing Executors will eventually exceed ApplicationState.MAX_NUM_RETRY, resulting in the application being removed from the master's accounting.
JoshRosen
Author: Mark Hamstra <markhamstra@gmail.com>
Closes #3550 from markhamstra/SPARK-4498 and squashes the following commits:
8f543b1 [Mark Hamstra] Don't transition ExecutorInfo to RUNNING until Executor is added by Driver
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 2 |
2 files changed, 1 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 98a93d1fcb..4efebcaa35 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -134,6 +134,7 @@ private[spark] class AppClient( val fullId = appId + "/" + id logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores)) + master ! ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None) listener.executorAdded(fullId, workerId, hostPort, cores, memory) case ExecutorUpdated(id, state, message, exitStatus) => diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 8ba6a01bbc..f4fedc6327 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -144,8 +144,6 @@ private[spark] class ExecutorRunner( Files.write(header, stderr, UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) - state = ExecutorState.RUNNING - worker ! ExecutorStateChanged(appId, execId, state, None, None) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) // or with nonzero exit code val exitCode = process.waitFor() |