aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java31
1 files changed, 20 insertions, 11 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index b5a9d6671f..a27aaf2b27 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -123,16 +123,15 @@ public class TransportClientFactory implements Closeable {
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
- long preResolveHost = System.nanoTime();
- final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
- long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
- logger.info("Spent {} ms to resolve {}", hostResolveTimeMs, address);
+ // Use unresolved address here to avoid DNS resolution each time we creates a client.
+ final InetSocketAddress unresolvedAddress =
+ InetSocketAddress.createUnresolved(remoteHost, remotePort);
// Create the ClientPool if we don't have it yet.
- ClientPool clientPool = connectionPool.get(address);
+ ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) {
- connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
- clientPool = connectionPool.get(address);
+ connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
+ clientPool = connectionPool.get(unresolvedAddress);
}
int clientIndex = rand.nextInt(numConnectionsPerPeer);
@@ -149,25 +148,35 @@ public class TransportClientFactory implements Closeable {
}
if (cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ logger.trace("Returning cached connection to {}: {}",
+ cachedClient.getSocketAddress(), cachedClient);
return cachedClient;
}
}
// If we reach here, we don't have an existing connection open. Let's create a new one.
// Multiple threads might race here to create new connections. Keep only one of them active.
+ final long preResolveHost = System.nanoTime();
+ final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
+ final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
+ if (hostResolveTimeMs > 2000) {
+ logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+ } else {
+ logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+ }
+
synchronized (clientPool.locks[clientIndex]) {
cachedClient = clientPool.clients[clientIndex];
if (cachedClient != null) {
if (cachedClient.isActive()) {
- logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+ logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
return cachedClient;
} else {
- logger.info("Found inactive connection to {}, creating a new one.", address);
+ logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
}
}
- clientPool.clients[clientIndex] = createClient(address);
+ clientPool.clients[clientIndex] = createClient(resolvedAddress);
return clientPool.clients[clientIndex];
}
}