aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-11-25 11:42:53 -0800
committerAndrew Or <andrew@databricks.com>2015-11-25 11:42:53 -0800
commit88875d9413ec7d64a88d40857ffcf97b5853a7f2 (patch)
tree80b00ce434f823ae9209198e8bb8840a6110791e /core
parent9f3e59a16822fb61d60cf103bd4f7823552939c6 (diff)
downloadspark-88875d9413ec7d64a88d40857ffcf97b5853a7f2.tar.gz
spark-88875d9413ec7d64a88d40857ffcf97b5853a7f2.tar.bz2
spark-88875d9413ec7d64a88d40857ffcf97b5853a7f2.zip
[SPARK-10558][CORE] Fix wrong executor state in Master
`ExecutorAdded` can only be sent to `AppClient` when worker report back the executor state as `LOADING`, otherwise because of concurrency issue, `AppClient` will possibly receive `ExectuorAdded` at first, then `ExecutorStateUpdated` with `LOADING` state. Also Master will change the executor state from `LAUNCHING` to `RUNNING` (`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> `RUNNING`. Also it is wrongly shown in master UI, the state of executor should be `RUNNING` rather than `LOADING`: ![screen shot 2015-09-11 at 2 30 28 pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png) Author: jerryshao <sshao@hortonworks.com> Closes #8714 from jerryshao/SPARK-10558.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala14
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
4 files changed, 13 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index efa88c62e1..69c98e2893 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
private[deploy] object ExecutorState extends Enumeration {
- val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+ val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
type ExecutorState = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index afab362e21..df6ba7d669 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -178,9 +178,6 @@ private[spark] class AppClient(
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
cores))
- // FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
- // guaranteed), `ExecutorStateChanged` may be sent to a dead master.
- sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
case ExecutorUpdated(id, state, message, exitStatus) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b25a487806..9952c97dbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -253,9 +253,17 @@ private[deploy] class Master(
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
+ val oldState = exec.state
exec.state = state
- if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
+
+ if (state == ExecutorState.RUNNING) {
+ assert(oldState == ExecutorState.LAUNCHING,
+ s"executor $execId state transfer from $oldState to RUNNING is illegal")
+ appInfo.resetRetryCount()
+ }
+
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
+
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
@@ -702,8 +710,8 @@ private[deploy] class Master(
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
- exec.application.driver.send(ExecutorAdded(
- exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
+ exec.application.driver.send(
+ ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
private def registerWorker(worker: WorkerInfo): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a45867e768..418faf8fc9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -469,7 +469,7 @@ private[deploy] class Worker(
executorDir,
workerUri,
conf,
- appLocalDirs, ExecutorState.LOADING)
+ appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_