aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
Diffstat (limited to 'network')
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java7
1 files changed, 6 insertions, 1 deletions
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index a34aabe9e7..63b21222e7 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -76,6 +76,9 @@ public class YarnShuffleService extends AuxiliaryService {
// The actual server that serves shuffle files
private TransportServer shuffleServer = null;
+ // Handles registering executors and opening shuffle blocks
+ private ExternalShuffleBlockHandler blockHandler;
+
public YarnShuffleService() {
super("spark_shuffle");
logger.info("Initializing YARN shuffle service for Spark");
@@ -99,7 +102,8 @@ public class YarnShuffleService extends AuxiliaryService {
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
- RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf);
+ blockHandler = new ExternalShuffleBlockHandler(transportConf);
+ RpcHandler rpcHandler = blockHandler;
if (authEnabled) {
secretManager = new ShuffleSecretManager();
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
@@ -136,6 +140,7 @@ public class YarnShuffleService extends AuxiliaryService {
if (isAuthenticationEnabled()) {
secretManager.unregisterApp(appId);
}
+ blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
} catch (Exception e) {
logger.error("Exception when stopping application {}", appId, e);
}