diff options
author | Aaron Davidson <aaron@databricks.com> | 2015-05-08 17:13:55 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-05-08 17:13:55 -0700 |
commit | ffdc40ce7a799f2564f57b958d0f32f1d1636488 (patch) | |
tree | 854316c85a71eecfedf3d44bd450123da9884d42 /network/common | |
parent | 1c78f6866ebbcfb41d9875bfa3c0b9fa23b188bf (diff) | |
download | spark-ffdc40ce7a799f2564f57b958d0f32f1d1636488.tar.gz spark-ffdc40ce7a799f2564f57b958d0f32f1d1636488.tar.bz2 spark-ffdc40ce7a799f2564f57b958d0f32f1d1636488.zip |
[SPARK-6955] Perform port retries at NettyBlockTransferService level
Currently we're doing port retries in the TransportServer level, but this is not specified by the TransportContext API and it has other further-reaching impacts like causing undesirable behavior for the Yarn and Standalone shuffle services.
Author: Aaron Davidson <aaron@databricks.com>
Closes #5575 from aarondav/port-bind and squashes the following commits:
3c2d6ed [Aaron Davidson] Oops, never do it.
a5d9432 [Aaron Davidson] Remove shouldHostShuffleServiceIfEnabled
e901eb2 [Aaron Davidson] fix local-cluster mode for ExternalShuffleServiceSuite
59e5e38 [Aaron Davidson] [SPARK-6955] Perform port retries at NettyBlockTransferService level
Diffstat (limited to 'network/common')
-rw-r--r-- | network/common/src/main/java/org/apache/spark/network/server/TransportServer.java | 45 |
1 files changed, 9 insertions, 36 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 941ef95772..f4fadb1ee3 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; +import org.apache.spark.network.util.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,12 @@ public class TransportServer implements Closeable { this.appRpcHandler = appRpcHandler; this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); - init(portToBind); + try { + init(portToBind); + } catch (RuntimeException e) { + JavaUtils.closeQuietly(this); + throw e; + } } public int getPort() { @@ -114,7 +120,8 @@ public class TransportServer implements Closeable { } }); - bindRightPort(portToBind); + channelFuture = bootstrap.bind(new InetSocketAddress(portToBind)); + channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port :" + port); @@ -135,38 +142,4 @@ public class TransportServer implements Closeable { } bootstrap = null; } - - /** - * Attempt to bind to the specified port up to a fixed number of retries. - * If all attempts fail after the max number of retries, exit. - */ - private void bindRightPort(int portToBind) { - int maxPortRetries = conf.portMaxRetries(); - - for (int i = 0; i <= maxPortRetries; i++) { - int tryPort = -1; - if (0 == portToBind) { - // Do not increment port if tryPort is 0, which is treated as a special port - tryPort = 0; - } else { - // If the new port wraps around, do not try a privilege port - tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024; - } - try { - channelFuture = bootstrap.bind(new InetSocketAddress(tryPort)); - channelFuture.syncUninterruptibly(); - return; - } catch (Exception e) { - logger.warn("Netty service could not bind on port " + tryPort + - ". Attempting the next port."); - if (i >= maxPortRetries) { - logger.error(e.getMessage() + ": Netty server failed after " - + maxPortRetries + " retries."); - - // If it can't find a right port, it should exit directly. - System.exit(-1); - } - } - } - } } |