aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorhuangzhaowei <carlmartinmax@gmail.com>2015-02-06 14:35:29 -0800
committerAndrew Or <andrew@databricks.com>2015-02-06 14:36:58 -0800
commit2bda1c1d376afd8abe6a04be345461752f3fb1b6 (patch)
treea3993c6167ebb56082a9a5e0929c482252523bed /network
parentdcd1e42d6b6ac08d2c0736bf61a15f515a1f222b (diff)
downloadspark-2bda1c1d376afd8abe6a04be345461752f3fb1b6.tar.gz
spark-2bda1c1d376afd8abe6a04be345461752f3fb1b6.tar.bz2
spark-2bda1c1d376afd8abe6a04be345461752f3fb1b6.zip
[SPARK-5444][Network]Add a retry to deal with the conflict port in netty server.
If the `spark.blockMnager.port` had conflicted with a specific port, Spark will throw an exception and exit. So add a retry to avoid this situation. Author: huangzhaowei <carlmartinmax@gmail.com> Closes #4240 from SaintBacchus/NettyPortConflict and squashes the following commits: cc926d2 [huangzhaowei] Add a retry to deal with the conflict port in netty server.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java36
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java7
2 files changed, 41 insertions, 2 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 625c3257d7..ef20999180 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -100,8 +100,7 @@ public class TransportServer implements Closeable {
}
});
- channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
- channelFuture.syncUninterruptibly();
+ bindRightPort(portToBind);
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port :" + port);
@@ -123,4 +122,37 @@ public class TransportServer implements Closeable {
bootstrap = null;
}
+ /**
+ * Attempt to bind to the specified port up to a fixed number of retries.
+ * If all attempts fail after the max number of retries, exit.
+ */
+ private void bindRightPort(int portToBind) {
+ int maxPortRetries = conf.portMaxRetries();
+
+ for (int i = 0; i <= maxPortRetries; i++) {
+ int tryPort = -1;
+ if (0 == portToBind) {
+ // Do not increment port if tryPort is 0, which is treated as a special port
+ tryPort = 0;
+ } else {
+ // If the new port wraps around, do not try a privilege port
+ tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024;
+ }
+ try {
+ channelFuture = bootstrap.bind(new InetSocketAddress(tryPort));
+ channelFuture.syncUninterruptibly();
+ return;
+ } catch (Exception e) {
+ logger.warn("Netty service could not bind on port " + tryPort +
+ ". Attempting the next port.");
+ if (i >= maxPortRetries) {
+ logger.error(e.getMessage() + ": Netty server failed after "
+ + maxPortRetries + " retries.");
+
+ // If it can't find a right port, it should exit directly.
+ System.exit(-1);
+ }
+ }
+ }
+ }
}
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 6c91786886..2eaf3b71d9 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -98,4 +98,11 @@ public class TransportConf {
public boolean lazyFileDescriptor() {
return conf.getBoolean("spark.shuffle.io.lazyFD", true);
}
+
+ /**
+ * Maximum number of retries when binding to a port before giving up.
+ */
+ public int portMaxRetries() {
+ return conf.getInt("spark.port.maxRetries", 16);
+ }
}