aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2015-05-08 17:13:55 -0700
committerAndrew Or <andrew@databricks.com>2015-05-08 17:13:55 -0700
commitffdc40ce7a799f2564f57b958d0f32f1d1636488 (patch)
tree854316c85a71eecfedf3d44bd450123da9884d42 /network
parent1c78f6866ebbcfb41d9875bfa3c0b9fa23b188bf (diff)
downloadspark-ffdc40ce7a799f2564f57b958d0f32f1d1636488.tar.gz
spark-ffdc40ce7a799f2564f57b958d0f32f1d1636488.tar.bz2
spark-ffdc40ce7a799f2564f57b958d0f32f1d1636488.zip
[SPARK-6955] Perform port retries at NettyBlockTransferService level
Currently we're doing port retries in the TransportServer level, but this is not specified by the TransportContext API and it has other further-reaching impacts like causing undesirable behavior for the Yarn and Standalone shuffle services. Author: Aaron Davidson <aaron@databricks.com> Closes #5575 from aarondav/port-bind and squashes the following commits: 3c2d6ed [Aaron Davidson] Oops, never do it. a5d9432 [Aaron Davidson] Remove shouldHostShuffleServiceIfEnabled e901eb2 [Aaron Davidson] fix local-cluster mode for ExternalShuffleServiceSuite 59e5e38 [Aaron Davidson] [SPARK-6955] Perform port retries at NettyBlockTransferService level
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java45
1 files changed, 9 insertions, 36 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 941ef95772..f4fadb1ee3 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import org.apache.spark.network.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +66,12 @@ public class TransportServer implements Closeable {
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
- init(portToBind);
+ try {
+ init(portToBind);
+ } catch (RuntimeException e) {
+ JavaUtils.closeQuietly(this);
+ throw e;
+ }
}
public int getPort() {
@@ -114,7 +120,8 @@ public class TransportServer implements Closeable {
}
});
- bindRightPort(portToBind);
+ channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
+ channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port :" + port);
@@ -135,38 +142,4 @@ public class TransportServer implements Closeable {
}
bootstrap = null;
}
-
- /**
- * Attempt to bind to the specified port up to a fixed number of retries.
- * If all attempts fail after the max number of retries, exit.
- */
- private void bindRightPort(int portToBind) {
- int maxPortRetries = conf.portMaxRetries();
-
- for (int i = 0; i <= maxPortRetries; i++) {
- int tryPort = -1;
- if (0 == portToBind) {
- // Do not increment port if tryPort is 0, which is treated as a special port
- tryPort = 0;
- } else {
- // If the new port wraps around, do not try a privilege port
- tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
- }
- try {
- channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
- channelFuture.syncUninterruptibly();
- return;
- } catch (Exception e) {
- logger.warn("Netty service could not bind on port " + tryPort +
- ". Attempting the next port.");
- if (i >= maxPortRetries) {
- logger.error(e.getMessage() + ": Netty server failed after "
- + maxPortRetries + " retries.");
-
- // If it can't find a right port, it should exit directly.
- System.exit(-1);
- }
- }
- }
- }
}