aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle/src/main
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-06 19:54:32 -0800
committerAndrew Or <andrew@databricks.com>2014-11-06 19:54:32 -0800
commit48a19a6dba896f7d0b637f84e114b7efbb814e51 (patch)
tree70fb3c87345730f36e3b9c2f0d94c606b6d8279f /network/shuffle/src/main
parentf165b2bbf5d4acf34d826fa55b900f5bbc295654 (diff)
downloadspark-48a19a6dba896f7d0b637f84e114b7efbb814e51.tar.gz
spark-48a19a6dba896f7d0b637f84e114b7efbb814e51.tar.bz2
spark-48a19a6dba896f7d0b637f84e114b7efbb814e51.zip
[SPARK-4236] Cleanup removed applications' files in shuffle service
This relies on a hook from whoever is hosting the shuffle service to invoke removeApplication() when the application is completed. Once invoked, we will clean up all the executors' shuffle directories we know about. Author: Aaron Davidson <aaron@databricks.com> Closes #3126 from aarondav/cleanup and squashes the following commits: 33a64a9 [Aaron Davidson] Missing brace e6e428f [Aaron Davidson] Address comments 16a0d27 [Aaron Davidson] Cleanup e4df3e7 [Aaron Davidson] [SPARK-4236] Cleanup removed applications' files in shuffle service
Diffstat (limited to 'network/shuffle/src/main')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java10
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java118
2 files changed, 111 insertions, 17 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index cd3fea85b1..75ebf8c7b0 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -94,9 +94,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
return streamManager;
}
- /** For testing, clears all executors registered with "RegisterExecutor". */
- @VisibleForTesting
- public void clearRegisteredExecutors() {
- blockManager.clearRegisteredExecutors();
+ /**
+ * Removes an application (once it has been terminated), and optionally will clean up any
+ * local directories associated with the executors of that application in a separate thread.
+ */
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ blockManager.applicationRemoved(appId, cleanupLocalDirs);
}
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index 6589889fe1..98fcfb82aa 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -21,9 +21,15 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,13 +49,22 @@ import org.apache.spark.network.util.JavaUtils;
public class ExternalShuffleBlockManager {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
- // Map from "appId-execId" to the executor's configuration.
- private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
- new ConcurrentHashMap<String, ExecutorShuffleInfo>();
+ // Map containing all registered executors' metadata.
+ private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
- // Returns an id suitable for a single executor within a single application.
- private String getAppExecId(String appId, String execId) {
- return appId + "-" + execId;
+ // Single-threaded Java executor used to perform expensive recursive directory deletion.
+ private final Executor directoryCleaner;
+
+ public ExternalShuffleBlockManager() {
+ // TODO: Give this thread a name.
+ this(Executors.newSingleThreadExecutor());
+ }
+
+ // Allows tests to have more control over when directories are cleaned up.
+ @VisibleForTesting
+ ExternalShuffleBlockManager(Executor directoryCleaner) {
+ this.executors = Maps.newConcurrentMap();
+ this.directoryCleaner = directoryCleaner;
}
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
@@ -57,7 +72,7 @@ public class ExternalShuffleBlockManager {
String appId,
String execId,
ExecutorShuffleInfo executorInfo) {
- String fullId = getAppExecId(appId, execId);
+ AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
executors.put(fullId, executorInfo);
}
@@ -78,7 +93,7 @@ public class ExternalShuffleBlockManager {
int mapId = Integer.parseInt(blockIdParts[2]);
int reduceId = Integer.parseInt(blockIdParts[3]);
- ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
+ ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -95,6 +110,56 @@ public class ExternalShuffleBlockManager {
}
/**
+ * Removes our metadata of all executors registered for the given application, and optionally
+ * also deletes the local directories associated with the executors of that application in a
+ * separate thread.
+ *
+ * It is not valid to call registerExecutor() for an executor with this appId after invoking
+ * this method.
+ */
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+ Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+ AppExecId fullId = entry.getKey();
+ final ExecutorShuffleInfo executor = entry.getValue();
+
+ // Only touch executors associated with the appId that was removed.
+ if (appId.equals(fullId.appId)) {
+ it.remove();
+
+ if (cleanupLocalDirs) {
+ logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+ // Execute the actual deletion in a different thread, as it may take some time.
+ directoryCleaner.execute(new Runnable() {
+ @Override
+ public void run() {
+ deleteExecutorDirs(executor.localDirs);
+ }
+ });
+ }
+ }
+ }
+ }
+
+ /**
+ * Synchronously deletes each directory one at a time.
+ * Should be executed in its own thread, as this may take a long time.
+ */
+ private void deleteExecutorDirs(String[] dirs) {
+ for (String localDir : dirs) {
+ try {
+ JavaUtils.deleteRecursively(new File(localDir));
+ logger.debug("Successfully cleaned up directory: " + localDir);
+ } catch (Exception e) {
+ logger.error("Failed to delete directory: " + localDir, e);
+ }
+ }
+ }
+
+ /**
* Hash-based shuffle data is simply stored as one file per block.
* This logic is from FileShuffleBlockManager.
*/
@@ -146,9 +211,36 @@ public class ExternalShuffleBlockManager {
return new File(new File(localDir, String.format("%02x", subDirId)), filename);
}
- /** For testing, clears all registered executors. */
- @VisibleForTesting
- void clearRegisteredExecutors() {
- executors.clear();
+ /** Simply encodes an executor's full ID, which is appId + execId. */
+ private static class AppExecId {
+ final String appId;
+ final String execId;
+
+ private AppExecId(String appId, String execId) {
+ this.appId = appId;
+ this.execId = execId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AppExecId appExecId = (AppExecId) o;
+ return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId, execId);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("appId", appId)
+ .add("execId", execId)
+ .toString();
+ }
}
}