diff options
author | zsxwing <zsxwing@gmail.com> | 2015-11-19 11:57:50 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-11-19 11:57:50 -0800 |
commit | 72d150c271d2b206148fd0917a0def263445121b (patch) | |
tree | bae0d521a5e050c85ddf53e61cec513c82c3ce7d /core | |
parent | 962878843b611fa6229e3ee67bb22e2a4bc283cd (diff) | |
download | spark-72d150c271d2b206148fd0917a0def263445121b.tar.gz spark-72d150c271d2b206148fd0917a0def263445121b.tar.bz2 spark-72d150c271d2b206148fd0917a0def263445121b.zip |
[SPARK-11830][CORE] Make NettyRpcEnv bind to the specified host
This PR includes the following change:
1. Bind NettyRpcEnv to the specified host
2. Fix the port information in the log for NettyRpcEnv.
3. Fix the service name of NettyRpcEnv.
Author: zsxwing <zsxwing@gmail.com>
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #9821 from zsxwing/SPARK-11830.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkEnv.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 7 |
2 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 4474a83bed..88df27f733 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -258,8 +258,15 @@ object SparkEnv extends Logging { if (rpcEnv.isInstanceOf[AkkaRpcEnv]) { rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem } else { + val actorSystemPort = if (port == 0) 0 else rpcEnv.address.port + 1 // Create a ActorSystem for legacy codes - AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)._1 + AkkaUtils.createActorSystem( + actorSystemName + "ActorSystem", + hostname, + actorSystemPort, + conf, + securityManager + )._1 } // Figure out which port Akka actually bound to in case the original port is 0 or occupied. diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 3e0c497969..3ce3598680 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -102,7 +102,7 @@ private[netty] class NettyRpcEnv( } else { java.util.Collections.emptyList() } - server = transportContext.createServer(port, bootstraps) + server = transportContext.createServer(host, port, bootstraps) dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) } @@ -337,10 +337,10 @@ private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(actualPort) - (nettyEnv, actualPort) + (nettyEnv, nettyEnv.address.port) } try { - Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, "NettyRpcEnv")._1 + Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() @@ -370,7 +370,6 @@ private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { * @param conf Spark configuration. * @param endpointAddress The address where the endpoint is listening. * @param nettyEnv The RpcEnv associated with this ref. - * @param local Whether the referenced endpoint lives in the same process. */ private[netty] class NettyRpcEndpointRef( @transient private val conf: SparkConf, |