aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-11-19 11:57:50 -0800
committerReynold Xin <rxin@databricks.com>2015-11-19 11:57:50 -0800
commit72d150c271d2b206148fd0917a0def263445121b (patch)
treebae0d521a5e050c85ddf53e61cec513c82c3ce7d
parent962878843b611fa6229e3ee67bb22e2a4bc283cd (diff)
downloadspark-72d150c271d2b206148fd0917a0def263445121b.tar.gz
spark-72d150c271d2b206148fd0917a0def263445121b.tar.bz2
spark-72d150c271d2b206148fd0917a0def263445121b.zip
[SPARK-11830][CORE] Make NettyRpcEnv bind to the specified host
This PR includes the following change: 1. Bind NettyRpcEnv to the specified host 2. Fix the port information in the log for NettyRpcEnv. 3. Fix the service name of NettyRpcEnv. Author: zsxwing <zsxwing@gmail.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #9821 from zsxwing/SPARK-11830.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala7
-rw-r--r--network/common/src/main/java/org/apache/spark/network/TransportContext.java8
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java14
4 files changed, 28 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 4474a83bed..88df27f733 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -258,8 +258,15 @@ object SparkEnv extends Logging {
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
} else {
+ val actorSystemPort = if (port == 0) 0 else rpcEnv.address.port + 1
// Create a ActorSystem for legacy codes
- AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)._1
+ AkkaUtils.createActorSystem(
+ actorSystemName + "ActorSystem",
+ hostname,
+ actorSystemPort,
+ conf,
+ securityManager
+ )._1
}
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 3e0c497969..3ce3598680 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -102,7 +102,7 @@ private[netty] class NettyRpcEnv(
} else {
java.util.Collections.emptyList()
}
- server = transportContext.createServer(port, bootstraps)
+ server = transportContext.createServer(host, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
@@ -337,10 +337,10 @@ private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(actualPort)
- (nettyEnv, actualPort)
+ (nettyEnv, nettyEnv.address.port)
}
try {
- Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, "NettyRpcEnv")._1
+ Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
@@ -370,7 +370,6 @@ private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
* @param conf Spark configuration.
* @param endpointAddress The address where the endpoint is listening.
* @param nettyEnv The RpcEnv associated with this ref.
- * @param local Whether the referenced endpoint lives in the same process.
*/
private[netty] class NettyRpcEndpointRef(
@transient private val conf: SparkConf,
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 1b64b863a9..238710d172 100644
--- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -94,7 +94,13 @@ public class TransportContext {
/** Create a server which will attempt to bind to a specific port. */
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
- return new TransportServer(this, port, rpcHandler, bootstraps);
+ return new TransportServer(this, null, port, rpcHandler, bootstraps);
+ }
+
+ /** Create a server which will attempt to bind to a specific host and port. */
+ public TransportServer createServer(
+ String host, int port, List<TransportServerBootstrap> bootstraps) {
+ return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
/** Creates a new server, binding to any available ephemeral port. */
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index f4fadb1ee3..baae235e02 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -55,9 +55,13 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture;
private int port = -1;
- /** Creates a TransportServer that binds to the given port, or to any available if 0. */
+ /**
+ * Creates a TransportServer that binds to the given host and the given port, or to any available
+ * if 0. If you don't want to bind to any special host, set "hostToBind" to null.
+ * */
public TransportServer(
TransportContext context,
+ String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
@@ -67,7 +71,7 @@ public class TransportServer implements Closeable {
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
try {
- init(portToBind);
+ init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
@@ -81,7 +85,7 @@ public class TransportServer implements Closeable {
return port;
}
- private void init(int portToBind) {
+ private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
@@ -120,7 +124,9 @@ public class TransportServer implements Closeable {
}
});
- channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
+ InetSocketAddress address = hostToBind == null ?
+ new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
+ channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();