aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-04-06 16:18:04 -0700
committerAndrew Or <andrew@databricks.com>2016-04-06 16:18:04 -0700
commitf1def573f4c1c757f727476521a1509b5285051d (patch)
treed4e025a195aee3c1bd3ab394ea63e53090b55eec /core
parentc4bb02abf2c5b1724f2f848c79da5ebbf2584e45 (diff)
downloadspark-f1def573f4c1c757f727476521a1509b5285051d.tar.gz
spark-f1def573f4c1c757f727476521a1509b5285051d.tar.bz2
spark-f1def573f4c1c757f727476521a1509b5285051d.zip
[SPARK-13112][CORE] Make sure RegisterExecutorResponse arrive before LaunchTask
## What changes were proposed in this pull request? Send `RegisterExecutorResponse` using `executorRef` in order to make sure RegisterExecutorResponse and LaunchTask are both sent using the same channel. Then RegisterExecutorResponse will always arrive before LaunchTask ## How was this patch tested? Existing unit tests Closes #12078 Author: Shixiong Zhu <shixiong@databricks.com> Closes #12211 from zsxwing/SPARK-13112.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala2
4 files changed, 16 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 81e41e6fa7..d4ed5845e7 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -57,12 +57,11 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
- ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls))
+ ref.ask[Boolean](RegisterExecutor(executorId, self, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
- case Success(msg) => Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
- }
+ case Success(msg) =>
+ // Always receive `true`. Just ignore it
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e5abf0e150..8896391f97 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -150,7 +150,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
- context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
+ executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
+ context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
@@ -177,8 +178,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
+ executorRef.send(RegisteredExecutor(executorAddress.host))
// Note: some tests expect the reply to come after we put the executor in the map
- context.reply(RegisteredExecutor(executorAddress.host))
+ context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 3777d77f8f..713d5e58b4 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -174,9 +174,9 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
- fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
+ fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
- fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
+ fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
@@ -255,7 +255,12 @@ class HeartbeatReceiverSuite
/**
* Dummy RPC endpoint to simulate executors.
*/
-private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint
+private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case _ =>
+ }
+}
/**
* Dummy scheduler backend to simulate executor allocation requests to the cluster manager.
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index d2e24912b5..3d39bd4a74 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -561,7 +561,7 @@ class StandaloneDynamicAllocationSuite
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
- backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
+ backend.driverEndpoint.askWithRetry[Boolean](message)
}
}