aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-06 19:54:32 -0800
committerAndrew Or <andrew@databricks.com>2014-11-06 19:54:32 -0800
commit48a19a6dba896f7d0b637f84e114b7efbb814e51 (patch)
tree70fb3c87345730f36e3b9c2f0d94c606b6d8279f /network/shuffle
parentf165b2bbf5d4acf34d826fa55b900f5bbc295654 (diff)
downloadspark-48a19a6dba896f7d0b637f84e114b7efbb814e51.tar.gz
spark-48a19a6dba896f7d0b637f84e114b7efbb814e51.tar.bz2
spark-48a19a6dba896f7d0b637f84e114b7efbb814e51.zip
[SPARK-4236] Cleanup removed applications' files in shuffle service
This relies on a hook from whoever is hosting the shuffle service to invoke removeApplication() when the application is completed. Once invoked, we will clean up all the executors' shuffle directories we know about. Author: Aaron Davidson <aaron@databricks.com> Closes #3126 from aarondav/cleanup and squashes the following commits: 33a64a9 [Aaron Davidson] Missing brace e6e428f [Aaron Davidson] Address comments 16a0d27 [Aaron Davidson] Cleanup e4df3e7 [Aaron Davidson] [SPARK-4236] Cleanup removed applications' files in shuffle service
Diffstat (limited to 'network/shuffle')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java10
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java118
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java142
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java4
5 files changed, 256 insertions, 20 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index cd3fea85b1..75ebf8c7b0 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -94,9 +94,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
return streamManager;
}
- /** For testing, clears all executors registered with "RegisterExecutor". */
- @VisibleForTesting
- public void clearRegisteredExecutors() {
- blockManager.clearRegisteredExecutors();
+ /**
+ * Removes an application (once it has been terminated), and optionally will clean up any
+ * local directories associated with the executors of that application in a separate thread.
+ */
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ blockManager.applicationRemoved(appId, cleanupLocalDirs);
}
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index 6589889fe1..98fcfb82aa 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -21,9 +21,15 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,13 +49,22 @@ import org.apache.spark.network.util.JavaUtils;
public class ExternalShuffleBlockManager {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
- // Map from "appId-execId" to the executor's configuration.
- private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
- new ConcurrentHashMap<String, ExecutorShuffleInfo>();
+ // Map containing all registered executors' metadata.
+ private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
- // Returns an id suitable for a single executor within a single application.
- private String getAppExecId(String appId, String execId) {
- return appId + "-" + execId;
+ // Single-threaded Java executor used to perform expensive recursive directory deletion.
+ private final Executor directoryCleaner;
+
+ public ExternalShuffleBlockManager() {
+ // TODO: Give this thread a name.
+ this(Executors.newSingleThreadExecutor());
+ }
+
+ // Allows tests to have more control over when directories are cleaned up.
+ @VisibleForTesting
+ ExternalShuffleBlockManager(Executor directoryCleaner) {
+ this.executors = Maps.newConcurrentMap();
+ this.directoryCleaner = directoryCleaner;
}
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
@@ -57,7 +72,7 @@ public class ExternalShuffleBlockManager {
String appId,
String execId,
ExecutorShuffleInfo executorInfo) {
- String fullId = getAppExecId(appId, execId);
+ AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
executors.put(fullId, executorInfo);
}
@@ -78,7 +93,7 @@ public class ExternalShuffleBlockManager {
int mapId = Integer.parseInt(blockIdParts[2]);
int reduceId = Integer.parseInt(blockIdParts[3]);
- ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
+ ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -95,6 +110,56 @@ public class ExternalShuffleBlockManager {
}
/**
+ * Removes our metadata of all executors registered for the given application, and optionally
+ * also deletes the local directories associated with the executors of that application in a
+ * separate thread.
+ *
+ * It is not valid to call registerExecutor() for an executor with this appId after invoking
+ * this method.
+ */
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+ Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+ AppExecId fullId = entry.getKey();
+ final ExecutorShuffleInfo executor = entry.getValue();
+
+ // Only touch executors associated with the appId that was removed.
+ if (appId.equals(fullId.appId)) {
+ it.remove();
+
+ if (cleanupLocalDirs) {
+ logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+ // Execute the actual deletion in a different thread, as it may take some time.
+ directoryCleaner.execute(new Runnable() {
+ @Override
+ public void run() {
+ deleteExecutorDirs(executor.localDirs);
+ }
+ });
+ }
+ }
+ }
+ }
+
+ /**
+ * Synchronously deletes each directory one at a time.
+ * Should be executed in its own thread, as this may take a long time.
+ */
+ private void deleteExecutorDirs(String[] dirs) {
+ for (String localDir : dirs) {
+ try {
+ JavaUtils.deleteRecursively(new File(localDir));
+ logger.debug("Successfully cleaned up directory: " + localDir);
+ } catch (Exception e) {
+ logger.error("Failed to delete directory: " + localDir, e);
+ }
+ }
+ }
+
+ /**
* Hash-based shuffle data is simply stored as one file per block.
* This logic is from FileShuffleBlockManager.
*/
@@ -146,9 +211,36 @@ public class ExternalShuffleBlockManager {
return new File(new File(localDir, String.format("%02x", subDirId)), filename);
}
- /** For testing, clears all registered executors. */
- @VisibleForTesting
- void clearRegisteredExecutors() {
- executors.clear();
+ /** Simply encodes an executor's full ID, which is appId + execId. */
+ private static class AppExecId {
+ final String appId;
+ final String execId;
+
+ private AppExecId(String appId, String execId) {
+ this.appId = appId;
+ this.execId = execId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AppExecId appExecId = (AppExecId) o;
+ return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId, execId);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("appId", appId)
+ .add("execId", execId)
+ .toString();
+ }
}
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
new file mode 100644
index 0000000000..c8ece3bc53
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalShuffleCleanupSuite {
+
+ // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
+ Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+
+ @Test
+ public void noCleanupAndCleanup() throws IOException {
+ TestShuffleDataContext dataContext = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", false /* cleanup */);
+
+ assertStillThere(dataContext);
+
+ manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true /* cleanup */);
+
+ assertCleanedUp(dataContext);
+ }
+
+ @Test
+ public void cleanupUsesExecutor() throws IOException {
+ TestShuffleDataContext dataContext = createSomeData();
+
+ final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+
+ // Executor which does nothing to ensure we're actually using it.
+ Executor noThreadExecutor = new Executor() {
+ @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
+ };
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+
+ manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true);
+
+ assertTrue(cleanupCalled.get());
+ assertStillThere(dataContext);
+
+ dataContext.cleanup();
+ assertCleanedUp(dataContext);
+ }
+
+ @Test
+ public void cleanupMultipleExecutors() throws IOException {
+ TestShuffleDataContext dataContext0 = createSomeData();
+ TestShuffleDataContext dataContext1 = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+ manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true);
+
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+ }
+
+ @Test
+ public void cleanupOnlyRemovedApp() throws IOException {
+ TestShuffleDataContext dataContext0 = createSomeData();
+ TestShuffleDataContext dataContext1 = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+ manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+
+ manager.applicationRemoved("app-nonexistent", true);
+ assertStillThere(dataContext0);
+ assertStillThere(dataContext1);
+
+ manager.applicationRemoved("app-0", true);
+ assertCleanedUp(dataContext0);
+ assertStillThere(dataContext1);
+
+ manager.applicationRemoved("app-1", true);
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+
+ // Make sure it's not an error to cleanup multiple times
+ manager.applicationRemoved("app-1", true);
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+ }
+
+ private void assertStillThere(TestShuffleDataContext dataContext) {
+ for (String localDir : dataContext.localDirs) {
+ assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
+ }
+ }
+
+ private void assertCleanedUp(TestShuffleDataContext dataContext) {
+ for (String localDir : dataContext.localDirs) {
+ assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
+ }
+ }
+
+ private TestShuffleDataContext createSomeData() throws IOException {
+ Random rand = new Random(123);
+ TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
+
+ dataContext.create();
+ dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
+ new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
+ dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
+ new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+ return dataContext;
+ }
+}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 06294fef19..3bea5b0f25 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -105,7 +105,7 @@ public class ExternalShuffleIntegrationSuite {
@After
public void afterEach() {
- handler.clearRegisteredExecutors();
+ handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
}
class FetchResult {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 442b756467..337b5c7bdb 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -30,8 +30,8 @@ import com.google.common.io.Files;
* and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}.
*/
public class TestShuffleDataContext {
- private final String[] localDirs;
- private final int subDirsPerLocalDir;
+ public final String[] localDirs;
+ public final int subDirsPerLocalDir;
public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
this.localDirs = new String[numLocalDirs];