diff options
author | huangzhaowei <carlmartinmax@gmail.com> | 2015-02-06 14:35:29 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-06 14:37:06 -0800 |
commit | caca15a4c0f3779f77fd23bb161b671d402359a8 (patch) | |
tree | ae9d339e219900a30291eee1f72172d226cdc258 | |
parent | 9fa29a629a03893c226c4d42228daaf13ee41e5c (diff) | |
download | spark-caca15a4c0f3779f77fd23bb161b671d402359a8.tar.gz spark-caca15a4c0f3779f77fd23bb161b671d402359a8.tar.bz2 spark-caca15a4c0f3779f77fd23bb161b671d402359a8.zip |
[SPARK-5444][Network]Add a retry to deal with the conflict port in netty server.
If the `spark.blockMnager.port` had conflicted with a specific port, Spark will throw an exception and exit.
So add a retry to avoid this situation.
Author: huangzhaowei <carlmartinmax@gmail.com>
Closes #4240 from SaintBacchus/NettyPortConflict and squashes the following commits:
cc926d2 [huangzhaowei] Add a retry to deal with the conflict port in netty server.
(cherry picked from commit 2bda1c1d376afd8abe6a04be345461752f3fb1b6)
Signed-off-by: Andrew Or <andrew@databricks.com>
-rw-r--r-- | network/common/src/main/java/org/apache/spark/network/server/TransportServer.java | 36 | ||||
-rw-r--r-- | network/common/src/main/java/org/apache/spark/network/util/TransportConf.java | 7 |
2 files changed, 41 insertions, 2 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 625c3257d7..ef20999180 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -100,8 +100,7 @@ public class TransportServer implements Closeable { } }); - channelFuture = bootstrap.bind(new InetSocketAddress(portToBind)); - channelFuture.syncUninterruptibly(); + bindRightPort(portToBind); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port :" + port); @@ -123,4 +122,37 @@ public class TransportServer implements Closeable { bootstrap = null; } + /** + * Attempt to bind to the specified port up to a fixed number of retries. + * If all attempts fail after the max number of retries, exit. + */ + private void bindRightPort(int portToBind) { + int maxPortRetries = conf.portMaxRetries(); + + for (int i = 0; i <= maxPortRetries; i++) { + int tryPort = -1; + if (0 == portToBind) { + // Do not increment port if tryPort is 0, which is treated as a special port + tryPort = 0; + } else { + // If the new port wraps around, do not try a privilege port + tryPort = ((portToBind + i - 1024) % (65536 - 1024)) + 1024; + } + try { + channelFuture = bootstrap.bind(new InetSocketAddress(tryPort)); + channelFuture.syncUninterruptibly(); + return; + } catch (Exception e) { + logger.warn("Netty service could not bind on port " + tryPort + + ". Attempting the next port."); + if (i >= maxPortRetries) { + logger.error(e.getMessage() + ": Netty server failed after " + + maxPortRetries + " retries."); + + // If it can't find a right port, it should exit directly. + System.exit(-1); + } + } + } + } } diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 6c91786886..2eaf3b71d9 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -98,4 +98,11 @@ public class TransportConf { public boolean lazyFileDescriptor() { return conf.getBoolean("spark.shuffle.io.lazyFD", true); } + + /** + * Maximum number of retries when binding to a port before giving up. + */ + public int portMaxRetries() { + return conf.getInt("spark.port.maxRetries", 16); + } } |