aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle
diff options
context:
space:
mode:
Diffstat (limited to 'network/shuffle')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java12
1 files changed, 8 insertions, 4 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index ea6d248d66..ef3a9dcc87 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -78,7 +78,7 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void init(String appId) {
this.appId = appId;
- TransportContext context = new TransportContext(conf, new NoOpRpcHandler());
+ TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
if (saslEnabled) {
bootstraps.add(new SaslClientBootstrap(conf, appId, secretKeyHolder, saslEncryptionEnabled));
@@ -137,9 +137,13 @@ public class ExternalShuffleClient extends ShuffleClient {
String execId,
ExecutorShuffleInfo executorInfo) throws IOException {
checkInit();
- TransportClient client = clientFactory.createClient(host, port);
- byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
- client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+ TransportClient client = clientFactory.createUnmanagedClient(host, port);
+ try {
+ byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
+ client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
+ } finally {
+ client.close();
+ }
}
@Override