aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2015-11-10 10:40:08 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-10 10:40:08 -0800
commit6e5fc37883ed81c3ee2338145a48de3036d19399 (patch)
tree67c33f795a8764706e7598f4afcae3eb00f42c41 /network/shuffle
parent689386b1c60997e4505749915f7005a52c207de2 (diff)
downloadspark-6e5fc37883ed81c3ee2338145a48de3036d19399.tar.gz
spark-6e5fc37883ed81c3ee2338145a48de3036d19399.tar.bz2
spark-6e5fc37883ed81c3ee2338145a48de3036d19399.zip
[SPARK-11252][NETWORK] ShuffleClient should release connection after fetching blocks had been completed for external shuffle
with yarn's external shuffle, ExternalShuffleClient of executors reserve its connections for yarn's NodeManager until application has been completed. so it will make NodeManager and executors have many socket connections. in order to reduce network pressure of NodeManager's shuffleService, after registerWithShuffleServer or fetchBlocks have been completed in ExternalShuffleClient, connection for NM's shuffleService needs to be closed.andrewor14 rxin vanzin Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #9227 from lianhuiwang/spark-11252.
Diffstat (limited to 'network/shuffle')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java12
1 files changed, 8 insertions, 4 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index ea6d248d66..ef3a9dcc87 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -78,7 +78,7 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void init(String appId) {
this.appId = appId;
- TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
+ TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
if (saslEnabled) {
bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder, saslEncryptionEnabled));
@@ -137,9 +137,13 @@ public class ExternalShuffleClient extends ShuffleClient {
String execId,
ExecutorShuffleInfo executorInfo) throws IOException {
checkInit();
- TransportClient client = clientFactory.createClient(host, port);
- byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
- client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+ TransportClient client = clientFactory.createUnmanagedClient(host, port);
+ try {
+ byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
+ client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+ } finally {
+ client.close();
+ }
}
@Override