aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorlianhuiwang <lianhuiwang09@gmail.com>2015-02-06 14:47:52 -0800
committerAndrew Or <andrew@databricks.com>2015-02-06 14:48:30 -0800
commit61073f832128845a76469fc37376483b784c927b (patch)
treec6677734bf254a773572255e0f0a3caa7932db3a /network
parent2bda1c1d376afd8abe6a04be345461752f3fb1b6 (diff)
downloadspark-61073f832128845a76469fc37376483b784c927b.tar.gz
spark-61073f832128845a76469fc37376483b784c927b.tar.bz2
spark-61073f832128845a76469fc37376483b784c927b.zip
[SPARK-4994][network]Cleanup removed executors' ShuffleInfo in yarn shuffle service
when the application is completed, yarn's nodemanager can remove application's local-dirs.but all executors' metadata of completed application havenot be removed. now it lets yarn ShuffleService to have much more memory to store Executors' ShuffleInfo. so these metadata need to be removed. Author: lianhuiwang <lianhuiwang09@gmail.com> Closes #3828 from lianhuiwang/SPARK-4994 and squashes the following commits: f3ba1d2 [lianhuiwang] Cleanup removed executors' ShuffleInfo
Diffstat (limited to 'network')
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java7
1 files changed, 6 insertions, 1 deletions
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index a34aabe9e7..63b21222e7 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -76,6 +76,9 @@ public class YarnShuffleService extends AuxiliaryService {
// The actual server that serves shuffle files
private TransportServer shuffleServer = null;
+ // Handles registering executors and opening shuffle blocks
+ private ExternalShuffleBlockHandler blockHandler;
+
public YarnShuffleService() {
super("spark_shuffle");
logger.info("Initializing YARN shuffle service for Spark");
@@ -99,7 +102,8 @@ public class YarnShuffleService extends AuxiliaryService {
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
- RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf);
+ blockHandler = new ExternalShuffleBlockHandler(transportConf);
+ RpcHandler rpcHandler = blockHandler;
if (authEnabled) {
secretManager = new ShuffleSecretManager();
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
@@ -136,6 +140,7 @@ public class YarnShuffleService extends AuxiliaryService {
if (isAuthenticationEnabled()) {
secretManager.unregisterApp(appId);
}
+ blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
} catch (Exception e) {
logger.error("Exception when stopping application {}", appId, e);
}