aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-06-17 15:48:17 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-17 15:48:17 -0700
commit62d8fe2089659e8212753a622708517e0f4a77bc (patch)
tree23685264247e79089bfe89be171590de96d06f71 /core/src
parent298c4ae81520b6b39230a6b0bf733c2b7caea627 (diff)
downloadspark-62d8fe2089659e8212753a622708517e0f4a77bc.tar.gz
spark-62d8fe2089659e8212753a622708517e0f4a77bc.tar.bz2
spark-62d8fe2089659e8212753a622708517e0f4a77bc.zip
[SPARK-16017][CORE] Send hostname from CoarseGrainedExecutorBackend to driver
## What changes were proposed in this pull request? [SPARK-15395](https://issues.apache.org/jira/browse/SPARK-15395) changes the behavior that how the driver gets the executor host and the driver will get the executor IP address instead of the host name. This PR just sends the hostname from executors to driver so that driver can pass it to TaskScheduler. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13741 from zsxwing/SPARK-16017.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala2
5 files changed, 12 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e087295109..ccc6c36e9c 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -39,6 +39,7 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
+ hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
@@ -57,7 +58,7 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
- ref.ask[Boolean](RegisterExecutor(executorId, self, cores, extractLogUrls))
+ ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
@@ -75,7 +76,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}
override def receive: PartialFunction[Any, Unit] = {
- case RegisteredExecutor(hostname) =>
+ case RegisteredExecutor =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
@@ -201,7 +202,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf, executorId, hostname, port, cores, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
- env.rpcEnv, driverUrl, executorId, cores, userClassPath, env))
+ env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 46a829114e..edc8aac5d1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -40,8 +40,7 @@ private[spark] object CoarseGrainedClusterMessages {
sealed trait RegisterExecutorResponse
- case class RegisteredExecutor(hostname: String) extends CoarseGrainedClusterMessage
- with RegisterExecutorResponse
+ case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
with RegisterExecutorResponse
@@ -50,6 +49,7 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
+ hostname: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e84cb6346d..967c4d5325 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -148,7 +148,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
+ case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
@@ -164,7 +164,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
- val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
+ val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
@@ -178,7 +178,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
- executorRef.send(RegisteredExecutor(executorAddress.host))
+ executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 81b94b5721..5e2ba311ee 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -174,9 +174,9 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
- RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
+ RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
fakeSchedulerBackend.driverEndpoint.askWithRetry[Boolean](
- RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
+ RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "1.2.3.5", 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 3d39bd4a74..814027076d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -559,7 +559,7 @@ class StandaloneDynamicAllocationSuite
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
- val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
+ val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askWithRetry[Boolean](message)
}