diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-04-06 16:18:04 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-04-06 16:18:04 -0700 |
commit | f1def573f4c1c757f727476521a1509b5285051d (patch) | |
tree | d4e025a195aee3c1bd3ab394ea63e53090b55eec /core/src/main/scala/org/apache | |
parent | c4bb02abf2c5b1724f2f848c79da5ebbf2584e45 (diff) | |
download | spark-f1def573f4c1c757f727476521a1509b5285051d.tar.gz spark-f1def573f4c1c757f727476521a1509b5285051d.tar.bz2 spark-f1def573f4c1c757f727476521a1509b5285051d.zip |
[SPARK-13112][CORE] Make sure RegisterExecutorResponse arrive before LaunchTask
## What changes were proposed in this pull request?
Send `RegisterExecutorResponse` using `executorRef` in order to make sure RegisterExecutorResponse and LaunchTask are both sent using the same channel. Then RegisterExecutorResponse will always arrive before LaunchTask
## How was this patch tested?
Existing unit tests
Closes #12078
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12211 from zsxwing/SPARK-13112.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 |
2 files changed, 7 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 81e41e6fa7..d4ed5845e7 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -57,12 +57,11 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls)) + ref.ask[Boolean](RegisterExecutor(executorId, self, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" - case Success(msg) => Utils.tryLogNonFatalError { - Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse - } + case Success(msg) => + // Always receive `true`. Just ignore it case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) System.exit(1) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e5abf0e150..8896391f97 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -150,7 +150,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case RegisterExecutor(executorId, executorRef, cores, logUrls) => if (executorDataMap.contains(executorId)) { - context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) + executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) + context.reply(true) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. @@ -177,8 +178,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } + executorRef.send(RegisteredExecutor(executorAddress.host)) // Note: some tests expect the reply to come after we put the executor in the map - context.reply(RegisteredExecutor(executorAddress.host)) + context.reply(true) listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() |