aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2015-11-10 10:40:08 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-10 10:40:08 -0800
commit6e5fc37883ed81c3ee2338145a48de3036d19399 (patch)
tree67c33f795a8764706e7598f4afcae3eb00f42c41 /network
parent689386b1c60997e4505749915f7005a52c207de2 (diff)
downloadspark-6e5fc37883ed81c3ee2338145a48de3036d19399.tar.gz
spark-6e5fc37883ed81c3ee2338145a48de3036d19399.tar.bz2
spark-6e5fc37883ed81c3ee2338145a48de3036d19399.zip
[SPARK-11252][NETWORK] ShuffleClient should release connection after fetching blocks had been completed for external shuffle
with yarn's external shuffle, ExternalShuffleClient of executors reserve its connections for yarn's NodeManager until application has been completed. so it will make NodeManager and executors have many socket connections. in order to reduce network pressure of NodeManager's shuffleService, after registerWithShuffleServer or fetchBlocks have been completed in ExternalShuffleClient, connection for NM's shuffleService needs to be closed.andrewor14 rxin vanzin Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #9227 from lianhuiwang/spark-11252.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/TransportContext.java11
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java10
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java26
-rw-r--r--network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java34
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java12
5 files changed, 79 insertions, 14 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 43900e6f2c..1b64b863a9 100644
--- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -59,15 +59,24 @@ public class TransportContext {
private final TransportConf conf;
private final RpcHandler rpcHandler;
+ private final boolean closeIdleConnections;
private final MessageEncoder encoder;
private final MessageDecoder decoder;
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
+ this(conf, rpcHandler, false);
+ }
+
+ public TransportContext(
+ TransportConf conf,
+ RpcHandler rpcHandler,
+ boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.encoder = new MessageEncoder();
this.decoder = new MessageDecoder();
+ this.closeIdleConnections = closeIdleConnections;
}
/**
@@ -144,7 +153,7 @@ public class TransportContext {
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
- conf.connectionTimeoutMs());
+ conf.connectionTimeoutMs(), closeIdleConnections);
}
public TransportConf getConf() { return conf; }
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 4952ffb44b..42a4f664e6 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -158,6 +158,16 @@ public class TransportClientFactory implements Closeable {
}
}
+ /**
+ * Create a completely new {@link TransportClient} to the given remote host / port
+ * But this connection is not pooled.
+ */
+ public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
+ throws IOException {
+ final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
+ return createClient(address);
+ }
+
/** Create a completely new {@link TransportClient} to the remote address. */
private TransportClient createClient(InetSocketAddress address) throws IOException {
logger.debug("Creating new connection to " + address);
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index 8e0ee709e3..f8fcd1c3d7 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -55,16 +55,19 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
private final TransportResponseHandler responseHandler;
private final TransportRequestHandler requestHandler;
private final long requestTimeoutNs;
+ private final boolean closeIdleConnections;
public TransportChannelHandler(
TransportClient client,
TransportResponseHandler responseHandler,
TransportRequestHandler requestHandler,
- long requestTimeoutMs) {
+ long requestTimeoutMs,
+ boolean closeIdleConnections) {
this.client = client;
this.responseHandler = responseHandler;
this.requestHandler = requestHandler;
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
+ this.closeIdleConnections = closeIdleConnections;
}
public TransportClient getClient() {
@@ -111,16 +114,21 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
IdleStateEvent e = (IdleStateEvent) evt;
// See class comment for timeout semantics. In addition to ensuring we only timeout while
// there are outstanding requests, we also do a secondary consistency check to ensure
- // there's no race between the idle timeout and incrementing the numOutstandingRequests.
- boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
+ // there's no race between the idle timeout and incrementing the numOutstandingRequests
+ // (see SPARK-7003).
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
- if (e.state() == IdleState.ALL_IDLE && hasInFlightRequests && isActuallyOverdue) {
- String address = NettyUtils.getRemoteAddress(ctx.channel());
- logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
- "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
- "is wrong.", address, requestTimeoutNs / 1000 / 1000);
- ctx.close();
+ if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
+ if (responseHandler.numOutstandingRequests() > 0) {
+ String address = NettyUtils.getRemoteAddress(ctx.channel());
+ logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
+ "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
+ "is wrong.", address, requestTimeoutNs / 1000 / 1000);
+ ctx.close();
+ } else if (closeIdleConnections) {
+ // While CloseIdleConnections is enable, we also close idle connection
+ ctx.close();
+ }
}
}
}
diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index 35de5e57cc..f447137419 100644
--- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +38,7 @@ import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.MapConfigProvider;
@@ -177,4 +179,36 @@ public class TransportClientFactorySuite {
assertFalse(c1.isActive());
assertFalse(c2.isActive());
}
+
+ @Test
+ public void closeIdleConnectionForRequestTimeOut() throws IOException, InterruptedException {
+ TransportConf conf = new TransportConf(new ConfigProvider() {
+
+ @Override
+ public String get(String name) {
+ if ("spark.shuffle.io.connectionTimeout".equals(name)) {
+ // We should make sure there is enough time for us to observe the channel is active
+ return "1s";
+ }
+ String value = System.getProperty(name);
+ if (value == null) {
+ throw new NoSuchElementException(name);
+ }
+ return value;
+ }
+ });
+ TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true);
+ TransportClientFactory factory = context.createClientFactory();
+ try {
+ TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
+ assertTrue(c1.isActive());
+ long expiredTime = System.currentTimeMillis() + 10000; // 10 seconds
+ while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
+ Thread.sleep(10);
+ }
+ assertFalse(c1.isActive());
+ } finally {
+ factory.close();
+ }
+ }
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index ea6d248d66..ef3a9dcc87 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -78,7 +78,7 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void init(String appId) {
this.appId = appId;
- TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
+ TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
if (saslEnabled) {
bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder, saslEncryptionEnabled));
@@ -137,9 +137,13 @@ public class ExternalShuffleClient extends ShuffleClient {
String execId,
ExecutorShuffleInfo executorInfo) throws IOException {
checkInit();
- TransportClient client = clientFactory.createClient(host, port);
- byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
- client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+ TransportClient client = clientFactory.createUnmanagedClient(host, port);
+ try {
+ byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
+ client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+ } finally {
+ client.close();
+ }
}
@Override