aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-12 15:28:08 -0700
committerReynold Xin <rxin@databricks.com>2016-04-12 15:28:08 -0700
commitc439d88e99c35a5f29f071715addfee8cbb215dc (patch)
treef9395e0464a087f2771daaf633df40c40f93943d /common
parent1ef5f8cfa6d6b7c9ec58a96dc447ab56ef709381 (diff)
downloadspark-c439d88e99c35a5f29f071715addfee8cbb215dc.tar.gz
spark-c439d88e99c35a5f29f071715addfee8cbb215dc.tar.bz2
spark-c439d88e99c35a5f29f071715addfee8cbb215dc.zip
[SPARK-14547] Avoid DNS resolution for reusing connections
## What changes were proposed in this pull request? This patch changes the connection creation logic in the network client module to avoid DNS resolution when reusing connections. ## How was this patch tested? Testing in production. This is too difficult to test in isolation (for high fidelity unit tests, we'd need to change the DNS resolution behavior in the JVM). Author: Reynold Xin <rxin@databricks.com> Closes #12315 from rxin/SPARK-14547.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java31
1 files changed, 20 insertions, 11 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index b5a9d6671f..a27aaf2b27 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -123,16 +123,15 @@ public class TransportClientFactory implements Closeable {
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
- long preResolveHost = System.nanoTime();
- final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
- long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
- logger.info("Spent {} ms to resolve {}", hostResolveTimeMs, address);
+ // Use unresolved address here to avoid DNS resolution each time we creates a client.
+ final InetSocketAddress unresolvedAddress =
+ InetSocketAddress.createUnresolved(remoteHost, remotePort);
// Create the ClientPool if we don't have it yet.
- ClientPool clientPool = connectionPool.get(address);
+ ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) {
- connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
- clientPool = connectionPool.get(address);
+ connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
+ clientPool = connectionPool.get(unresolvedAddress);
}
int clientIndex = rand.nextInt(numConnectionsPerPeer);
@@ -149,25 +148,35 @@ public class TransportClientFactory implements Closeable {
}
if (cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ logger.trace("Returning cached connection to {}: {}",
+ cachedClient.getSocketAddress(), cachedClient);
return cachedClient;
}
}
// If we reach here, we don't have an existing connection open. Let's create a new one.
// Multiple threads might race here to create new connections. Keep only one of them active.
+ final long preResolveHost = System.nanoTime();
+ final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
+ final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
+ if (hostResolveTimeMs > 2000) {
+ logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+ } else {
+ logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+ }
+
synchronized (clientPool.locks[clientIndex]) {
cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null) {
if (cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
return cachedClient;
} else {
- logger.info("Found inactive connection to {}, creating a new one.", address);
+ logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
}
}
- clientPool.clients[clientIndex] = createClient(address);
+ clientPool.clients[clientIndex] = createClient(resolvedAddress);
return clientPool.clients[clientIndex];
}
}