aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java59
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java10
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java118
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java142
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java4
6 files changed, 315 insertions, 20 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 2856d1c8c9..75c4a3981a 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -22,16 +22,22 @@ import java.nio.ByteBuffer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.base.Charsets;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * General utilities available in the network package. Many of these are sourced from Spark's
+ * own Utils, just accessible within this package.
+ */
public class JavaUtils {
private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
@@ -93,4 +99,57 @@ public class JavaUtils {
public static String bytesToString(ByteBuffer b) {
return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
}
+
+ /*
+ * Delete a file or directory and its contents recursively.
+ * Don't follow directories if they are symlinks.
+ * Throws an exception if deletion is unsuccessful.
+ */
+ public static void deleteRecursively(File file) throws IOException {
+ if (file == null) { return; }
+
+ if (file.isDirectory() && !isSymlink(file)) {
+ IOException savedIOException = null;
+ for (File child : listFilesSafely(file)) {
+ try {
+ deleteRecursively(child);
+ } catch (IOException e) {
+ // In case of multiple exceptions, only last one will be thrown
+ savedIOException = e;
+ }
+ }
+ if (savedIOException != null) {
+ throw savedIOException;
+ }
+ }
+
+ boolean deleted = file.delete();
+ // Delete can also fail if the file simply did not exist.
+ if (!deleted && file.exists()) {
+ throw new IOException("Failed to delete: " + file.getAbsolutePath());
+ }
+ }
+
+ private static File[] listFilesSafely(File file) throws IOException {
+ if (file.exists()) {
+ File[] files = file.listFiles();
+ if (files == null) {
+ throw new IOException("Failed to list files for dir: " + file);
+ }
+ return files;
+ } else {
+ return new File[0];
+ }
+ }
+
+ private static boolean isSymlink(File file) throws IOException {
+ Preconditions.checkNotNull(file);
+ File fileInCanonicalDir = null;
+ if (file.getParent() == null) {
+ fileInCanonicalDir = file;
+ } else {
+ fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
+ }
+ return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
+ }
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index cd3fea85b1..75ebf8c7b0 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -94,9 +94,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
return streamManager;
}
- /** For testing, clears all executors registered with "RegisterExecutor". */
- @VisibleForTesting
- public void clearRegisteredExecutors() {
- blockManager.clearRegisteredExecutors();
+ /**
+ * Removes an application (once it has been terminated), and optionally will clean up any
+ * local directories associated with the executors of that application in a separate thread.
+ */
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ blockManager.applicationRemoved(appId, cleanupLocalDirs);
}
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index 6589889fe1..98fcfb82aa 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -21,9 +21,15 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,13 +49,22 @@ import org.apache.spark.network.util.JavaUtils;
public class ExternalShuffleBlockManager {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
- // Map from "appId-execId" to the executor's configuration.
- private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
- new ConcurrentHashMap<String, ExecutorShuffleInfo>();
+ // Map containing all registered executors' metadata.
+ private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
- // Returns an id suitable for a single executor within a single application.
- private String getAppExecId(String appId, String execId) {
- return appId + "-" + execId;
+ // Single-threaded Java executor used to perform expensive recursive directory deletion.
+ private final Executor directoryCleaner;
+
+ public ExternalShuffleBlockManager() {
+ // TODO: Give this thread a name.
+ this(Executors.newSingleThreadExecutor());
+ }
+
+ // Allows tests to have more control over when directories are cleaned up.
+ @VisibleForTesting
+ ExternalShuffleBlockManager(Executor directoryCleaner) {
+ this.executors = Maps.newConcurrentMap();
+ this.directoryCleaner = directoryCleaner;
}
/** Registers a new Executor with all the configuration we need to find its shuffle files. */
@@ -57,7 +72,7 @@ public class ExternalShuffleBlockManager {
String appId,
String execId,
ExecutorShuffleInfo executorInfo) {
- String fullId = getAppExecId(appId, execId);
+ AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
executors.put(fullId, executorInfo);
}
@@ -78,7 +93,7 @@ public class ExternalShuffleBlockManager {
int mapId = Integer.parseInt(blockIdParts[2]);
int reduceId = Integer.parseInt(blockIdParts[3]);
- ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
+ ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -95,6 +110,56 @@ public class ExternalShuffleBlockManager {
}
/**
+ * Removes our metadata of all executors registered for the given application, and optionally
+ * also deletes the local directories associated with the executors of that application in a
+ * separate thread.
+ *
+ * It is not valid to call registerExecutor() for an executor with this appId after invoking
+ * this method.
+ */
+ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+ logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+ Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+ AppExecId fullId = entry.getKey();
+ final ExecutorShuffleInfo executor = entry.getValue();
+
+ // Only touch executors associated with the appId that was removed.
+ if (appId.equals(fullId.appId)) {
+ it.remove();
+
+ if (cleanupLocalDirs) {
+ logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+ // Execute the actual deletion in a different thread, as it may take some time.
+ directoryCleaner.execute(new Runnable() {
+ @Override
+ public void run() {
+ deleteExecutorDirs(executor.localDirs);
+ }
+ });
+ }
+ }
+ }
+ }
+
+ /**
+ * Synchronously deletes each directory one at a time.
+ * Should be executed in its own thread, as this may take a long time.
+ */
+ private void deleteExecutorDirs(String[] dirs) {
+ for (String localDir : dirs) {
+ try {
+ JavaUtils.deleteRecursively(new File(localDir));
+ logger.debug("Successfully cleaned up directory: " + localDir);
+ } catch (Exception e) {
+ logger.error("Failed to delete directory: " + localDir, e);
+ }
+ }
+ }
+
+ /**
* Hash-based shuffle data is simply stored as one file per block.
* This logic is from FileShuffleBlockManager.
*/
@@ -146,9 +211,36 @@ public class ExternalShuffleBlockManager {
return new File(new File(localDir, String.format("%02x", subDirId)), filename);
}
- /** For testing, clears all registered executors. */
- @VisibleForTesting
- void clearRegisteredExecutors() {
- executors.clear();
+ /** Simply encodes an executor's full ID, which is appId + execId. */
+ private static class AppExecId {
+ final String appId;
+ final String execId;
+
+ private AppExecId(String appId, String execId) {
+ this.appId = appId;
+ this.execId = execId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AppExecId appExecId = (AppExecId) o;
+ return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(appId, execId);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("appId", appId)
+ .add("execId", execId)
+ .toString();
+ }
}
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
new file mode 100644
index 0000000000..c8ece3bc53
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalShuffleCleanupSuite {
+
+ // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
+ Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+
+ @Test
+ public void noCleanupAndCleanup() throws IOException {
+ TestShuffleDataContext dataContext = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", false /* cleanup */);
+
+ assertStillThere(dataContext);
+
+ manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true /* cleanup */);
+
+ assertCleanedUp(dataContext);
+ }
+
+ @Test
+ public void cleanupUsesExecutor() throws IOException {
+ TestShuffleDataContext dataContext = createSomeData();
+
+ final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+
+ // Executor which does nothing to ensure we're actually using it.
+ Executor noThreadExecutor = new Executor() {
+ @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
+ };
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+
+ manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true);
+
+ assertTrue(cleanupCalled.get());
+ assertStillThere(dataContext);
+
+ dataContext.cleanup();
+ assertCleanedUp(dataContext);
+ }
+
+ @Test
+ public void cleanupMultipleExecutors() throws IOException {
+ TestShuffleDataContext dataContext0 = createSomeData();
+ TestShuffleDataContext dataContext1 = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+ manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true);
+
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+ }
+
+ @Test
+ public void cleanupOnlyRemovedApp() throws IOException {
+ TestShuffleDataContext dataContext0 = createSomeData();
+ TestShuffleDataContext dataContext1 = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+ manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+
+ manager.applicationRemoved("app-nonexistent", true);
+ assertStillThere(dataContext0);
+ assertStillThere(dataContext1);
+
+ manager.applicationRemoved("app-0", true);
+ assertCleanedUp(dataContext0);
+ assertStillThere(dataContext1);
+
+ manager.applicationRemoved("app-1", true);
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+
+ // Make sure it's not an error to cleanup multiple times
+ manager.applicationRemoved("app-1", true);
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+ }
+
+ private void assertStillThere(TestShuffleDataContext dataContext) {
+ for (String localDir : dataContext.localDirs) {
+ assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
+ }
+ }
+
+ private void assertCleanedUp(TestShuffleDataContext dataContext) {
+ for (String localDir : dataContext.localDirs) {
+ assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
+ }
+ }
+
+ private TestShuffleDataContext createSomeData() throws IOException {
+ Random rand = new Random(123);
+ TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
+
+ dataContext.create();
+ dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
+ new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
+ dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
+ new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+ return dataContext;
+ }
+}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 06294fef19..3bea5b0f25 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -105,7 +105,7 @@ public class ExternalShuffleIntegrationSuite {
@After
public void afterEach() {
- handler.clearRegisteredExecutors();
+ handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
}
class FetchResult {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 442b756467..337b5c7bdb 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -30,8 +30,8 @@ import com.google.common.io.Files;
* and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}.
*/
public class TestShuffleDataContext {
- private final String[] localDirs;
- private final int subDirsPerLocalDir;
+ public final String[] localDirs;
+ public final int subDirsPerLocalDir;
public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
this.localDirs = new String[numLocalDirs];