aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-11-19 11:57:50 -0800
committerReynold Xin <rxin@databricks.com>2015-11-19 11:57:50 -0800
commit72d150c271d2b206148fd0917a0def263445121b (patch)
treebae0d521a5e050c85ddf53e61cec513c82c3ce7d /core
parent962878843b611fa6229e3ee67bb22e2a4bc283cd (diff)
downloadspark-72d150c271d2b206148fd0917a0def263445121b.tar.gz
spark-72d150c271d2b206148fd0917a0def263445121b.tar.bz2
spark-72d150c271d2b206148fd0917a0def263445121b.zip
[SPARK-11830][CORE] Make NettyRpcEnv bind to the specified host
This PR includes the following change: 1. Bind NettyRpcEnv to the specified host 2. Fix the port information in the log for NettyRpcEnv. 3. Fix the service name of NettyRpcEnv. Author: zsxwing <zsxwing@gmail.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #9821 from zsxwing/SPARK-11830.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala7
2 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 4474a83bed..88df27f733 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -258,8 +258,15 @@ object SparkEnv extends Logging {
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
} else {
+ val actorSystemPort = if (port == 0) 0 else rpcEnv.address.port + 1
// Create a ActorSystem for legacy codes
- AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)._1
+ AkkaUtils.createActorSystem(
+ actorSystemName + "ActorSystem",
+ hostname,
+ actorSystemPort,
+ conf,
+ securityManager
+ )._1
}
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 3e0c497969..3ce3598680 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -102,7 +102,7 @@ private[netty] class NettyRpcEnv(
} else {
java.util.Collections.emptyList()
}
- server = transportContext.createServer(port, bootstraps)
+ server = transportContext.createServer(host, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
@@ -337,10 +337,10 @@ private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(actualPort)
- (nettyEnv, actualPort)
+ (nettyEnv, nettyEnv.address.port)
}
try {
- Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, "NettyRpcEnv")._1
+ Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
@@ -370,7 +370,6 @@ private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
* @param conf Spark configuration.
* @param endpointAddress The address where the endpoint is listening.
* @param nettyEnv The RpcEnv associated with this ref.
- * @param local Whether the referenced endpoint lives in the same process.
*/
private[netty] class NettyRpcEndpointRef(
@transient private val conf: SparkConf,