aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle/src/test
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-06 19:54:32 -0800
committerAndrew Or <andrew@databricks.com>2014-11-06 19:54:32 -0800
commit48a19a6dba896f7d0b637f84e114b7efbb814e51 (patch)
tree70fb3c87345730f36e3b9c2f0d94c606b6d8279f /network/shuffle/src/test
parentf165b2bbf5d4acf34d826fa55b900f5bbc295654 (diff)
downloadspark-48a19a6dba896f7d0b637f84e114b7efbb814e51.tar.gz
spark-48a19a6dba896f7d0b637f84e114b7efbb814e51.tar.bz2
spark-48a19a6dba896f7d0b637f84e114b7efbb814e51.zip
[SPARK-4236] Cleanup removed applications' files in shuffle service
This relies on a hook from whoever is hosting the shuffle service to invoke removeApplication() when the application is completed. Once invoked, we will clean up all the executors' shuffle directories we know about. Author: Aaron Davidson <aaron@databricks.com> Closes #3126 from aarondav/cleanup and squashes the following commits: 33a64a9 [Aaron Davidson] Missing brace e6e428f [Aaron Davidson] Address comments 16a0d27 [Aaron Davidson] Cleanup e4df3e7 [Aaron Davidson] [SPARK-4236] Cleanup removed applications' files in shuffle service
Diffstat (limited to 'network/shuffle/src/test')
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java142
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java4
3 files changed, 145 insertions, 3 deletions
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
new file mode 100644
index 0000000000..c8ece3bc53
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalShuffleCleanupSuite {
+
+ // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
+ Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+
+ @Test
+ public void noCleanupAndCleanup() throws IOException {
+ TestShuffleDataContext dataContext = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", false /* cleanup */);
+
+ assertStillThere(dataContext);
+
+ manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true /* cleanup */);
+
+ assertCleanedUp(dataContext);
+ }
+
+ @Test
+ public void cleanupUsesExecutor() throws IOException {
+ TestShuffleDataContext dataContext = createSomeData();
+
+ final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+
+ // Executor which does nothing to ensure we're actually using it.
+ Executor noThreadExecutor = new Executor() {
+ @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
+ };
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+
+ manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true);
+
+ assertTrue(cleanupCalled.get());
+ assertStillThere(dataContext);
+
+ dataContext.cleanup();
+ assertCleanedUp(dataContext);
+ }
+
+ @Test
+ public void cleanupMultipleExecutors() throws IOException {
+ TestShuffleDataContext dataContext0 = createSomeData();
+ TestShuffleDataContext dataContext1 = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+ manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+ manager.applicationRemoved("app", true);
+
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+ }
+
+ @Test
+ public void cleanupOnlyRemovedApp() throws IOException {
+ TestShuffleDataContext dataContext0 = createSomeData();
+ TestShuffleDataContext dataContext1 = createSomeData();
+
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+ manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+
+ manager.applicationRemoved("app-nonexistent", true);
+ assertStillThere(dataContext0);
+ assertStillThere(dataContext1);
+
+ manager.applicationRemoved("app-0", true);
+ assertCleanedUp(dataContext0);
+ assertStillThere(dataContext1);
+
+ manager.applicationRemoved("app-1", true);
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+
+ // Make sure it's not an error to cleanup multiple times
+ manager.applicationRemoved("app-1", true);
+ assertCleanedUp(dataContext0);
+ assertCleanedUp(dataContext1);
+ }
+
+ private void assertStillThere(TestShuffleDataContext dataContext) {
+ for (String localDir : dataContext.localDirs) {
+ assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
+ }
+ }
+
+ private void assertCleanedUp(TestShuffleDataContext dataContext) {
+ for (String localDir : dataContext.localDirs) {
+ assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
+ }
+ }
+
+ private TestShuffleDataContext createSomeData() throws IOException {
+ Random rand = new Random(123);
+ TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
+
+ dataContext.create();
+ dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
+ new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
+ dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
+ new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+ return dataContext;
+ }
+}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 06294fef19..3bea5b0f25 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -105,7 +105,7 @@ public class ExternalShuffleIntegrationSuite {
@After
public void afterEach() {
- handler.clearRegisteredExecutors();
+ handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
}
class FetchResult {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 442b756467..337b5c7bdb 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -30,8 +30,8 @@ import com.google.common.io.Files;
* and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}.
*/
public class TestShuffleDataContext {
- private final String[] localDirs;
- private final int subDirsPerLocalDir;
+ public final String[] localDirs;
+ public final int subDirsPerLocalDir;
public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
this.localDirs = new String[numLocalDirs];