aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-11-19 11:57:50 -0800
committerReynold Xin <rxin@databricks.com>2015-11-19 11:57:50 -0800
commit72d150c271d2b206148fd0917a0def263445121b (patch)
treebae0d521a5e050c85ddf53e61cec513c82c3ce7d /network
parent962878843b611fa6229e3ee67bb22e2a4bc283cd (diff)
downloadspark-72d150c271d2b206148fd0917a0def263445121b.tar.gz
spark-72d150c271d2b206148fd0917a0def263445121b.tar.bz2
spark-72d150c271d2b206148fd0917a0def263445121b.zip
[SPARK-11830][CORE] Make NettyRpcEnv bind to the specified host
This PR includes the following change: 1. Bind NettyRpcEnv to the specified host 2. Fix the port information in the log for NettyRpcEnv. 3. Fix the service name of NettyRpcEnv. Author: zsxwing <zsxwing@gmail.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #9821 from zsxwing/SPARK-11830.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/TransportContext.java8
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java14
2 files changed, 17 insertions, 5 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 1b64b863a9..238710d172 100644
--- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -94,7 +94,13 @@ public class TransportContext {
/** Create a server which will attempt to bind to a specific port. */
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
- return new TransportServer(this, port, rpcHandler, bootstraps);
+ return new TransportServer(this, null, port, rpcHandler, bootstraps);
+ }
+
+ /** Create a server which will attempt to bind to a specific host and port. */
+ public TransportServer createServer(
+ String host, int port, List<TransportServerBootstrap> bootstraps) {
+ return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
/** Creates a new server, binding to any available ephemeral port. */
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index f4fadb1ee3..baae235e02 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -55,9 +55,13 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture;
private int port = -1;
- /** Creates a TransportServer that binds to the given port, or to any available if 0. */
+ /**
+ * Creates a TransportServer that binds to the given host and the given port, or to any available
+ * if 0. If you don't want to bind to any special host, set "hostToBind" to null.
+ * */
public TransportServer(
TransportContext context,
+ String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
@@ -67,7 +71,7 @@ public class TransportServer implements Closeable {
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
try {
- init(portToBind);
+ init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
@@ -81,7 +85,7 @@ public class TransportServer implements Closeable {
return port;
}
- private void init(int portToBind) {
+ private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
@@ -120,7 +124,9 @@ public class TransportServer implements Closeable {
}
});
- channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
+ InetSocketAddress address = hostToBind == null ?
+ new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
+ channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();