diff options
author | zsxwing <zsxwing@gmail.com> | 2015-11-19 11:57:50 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-11-19 11:57:50 -0800 |
commit | 72d150c271d2b206148fd0917a0def263445121b (patch) | |
tree | bae0d521a5e050c85ddf53e61cec513c82c3ce7d /network | |
parent | 962878843b611fa6229e3ee67bb22e2a4bc283cd (diff) | |
download | spark-72d150c271d2b206148fd0917a0def263445121b.tar.gz spark-72d150c271d2b206148fd0917a0def263445121b.tar.bz2 spark-72d150c271d2b206148fd0917a0def263445121b.zip |
[SPARK-11830][CORE] Make NettyRpcEnv bind to the specified host
This PR includes the following change:
1. Bind NettyRpcEnv to the specified host
2. Fix the port information in the log for NettyRpcEnv.
3. Fix the service name of NettyRpcEnv.
Author: zsxwing <zsxwing@gmail.com>
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #9821 from zsxwing/SPARK-11830.
Diffstat (limited to 'network')
-rw-r--r-- | network/common/src/main/java/org/apache/spark/network/TransportContext.java | 8 | ||||
-rw-r--r-- | network/common/src/main/java/org/apache/spark/network/server/TransportServer.java | 14 |
2 files changed, 17 insertions, 5 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java index 1b64b863a9..238710d172 100644 --- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java @@ -94,7 +94,13 @@ public class TransportContext { /** Create a server which will attempt to bind to a specific port. */ public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) { - return new TransportServer(this, port, rpcHandler, bootstraps); + return new TransportServer(this, null, port, rpcHandler, bootstraps); + } + + /** Create a server which will attempt to bind to a specific host and port. */ + public TransportServer createServer( + String host, int port, List<TransportServerBootstrap> bootstraps) { + return new TransportServer(this, host, port, rpcHandler, bootstraps); } /** Creates a new server, binding to any available ephemeral port. */ diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index f4fadb1ee3..baae235e02 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -55,9 +55,13 @@ public class TransportServer implements Closeable { private ChannelFuture channelFuture; private int port = -1; - /** Creates a TransportServer that binds to the given port, or to any available if 0. */ + /** + * Creates a TransportServer that binds to the given host and the given port, or to any available + * if 0. If you don't want to bind to any special host, set "hostToBind" to null. + * */ public TransportServer( TransportContext context, + String hostToBind, int portToBind, RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps) { @@ -67,7 +71,7 @@ public class TransportServer implements Closeable { this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); try { - init(portToBind); + init(hostToBind, portToBind); } catch (RuntimeException e) { JavaUtils.closeQuietly(this); throw e; @@ -81,7 +85,7 @@ public class TransportServer implements Closeable { return port; } - private void init(int portToBind) { + private void init(String hostToBind, int portToBind) { IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = @@ -120,7 +124,9 @@ public class TransportServer implements Closeable { } }); - channelFuture = bootstrap.bind(new InetSocketAddress(portToBind)); + InetSocketAddress address = hostToBind == null ? + new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); + channelFuture = bootstrap.bind(address); channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); |