diff options
-rw-r--r-- | network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index a34aabe9e7..63b21222e7 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -76,6 +76,9 @@ public class YarnShuffleService extends AuxiliaryService { // The actual server that serves shuffle files private TransportServer shuffleServer = null; + // Handles registering executors and opening shuffle blocks + private ExternalShuffleBlockHandler blockHandler; + public YarnShuffleService() { super("spark_shuffle"); logger.info("Initializing YARN shuffle service for Spark"); @@ -99,7 +102,8 @@ public class YarnShuffleService extends AuxiliaryService { // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); - RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf); + blockHandler = new ExternalShuffleBlockHandler(transportConf); + RpcHandler rpcHandler = blockHandler; if (authEnabled) { secretManager = new ShuffleSecretManager(); rpcHandler = new SaslRpcHandler(rpcHandler, secretManager); @@ -136,6 +140,7 @@ public class YarnShuffleService extends AuxiliaryService { if (isAuthenticationEnabled()) { secretManager.unregisterApp(appId); } + blockHandler.applicationRemoved(appId, false /* clean up local dirs */); } catch (Exception e) { logger.error("Exception when stopping application {}", appId, e); } |