aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle/src
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-05-08 12:24:06 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-08 12:24:06 -0700
commit4b3bb0e43ca7e1a27308516608419487b6a844e6 (patch)
tree07327d1d8cf2c8a8bc34217c706d3d761e61c31e /network/shuffle/src
parent2d05f325dc3c70349bd17ed399897f22d967c687 (diff)
downloadspark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.gz
spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.tar.bz2
spark-4b3bb0e43ca7e1a27308516608419487b6a844e6.zip
[SPARK-6627] Finished rename to ShuffleBlockResolver
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager to ShuffleBlockResolver, but didn't rename the associated subclasses and variables; this commit does that. I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class? cc pwendell Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits: 43add1e [Kay Ousterhout] Spacing fix 96080bf [Kay Ousterhout] Test fixes d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver
Diffstat (limited to 'network/shuffle/src')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java6
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java (renamed from network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java)16
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java16
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java (renamed from network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java)30
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java37
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java8
6 files changed, 58 insertions, 55 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 46ca970862..e4faaf8854 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -46,18 +46,18 @@ import org.apache.spark.network.shuffle.protocol.StreamHandle;
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
- private final ExternalShuffleBlockManager blockManager;
+ private final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;
public ExternalShuffleBlockHandler(TransportConf conf) {
- this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
+ this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
}
/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
- ExternalShuffleBlockManager blockManager) {
+ ExternalShuffleBlockResolver blockManager) {
this.streamManager = streamManager;
this.blockManager = blockManager;
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 93e6fdd716..dd08e24cad 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -44,13 +44,13 @@ import org.apache.spark.network.util.TransportConf;
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
* of Executors. Each Executor must register its own configuration about where it stores its files
* (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
- * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager.
+ * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
*
* Executors with shuffle file consolidation are not currently supported, as the index is stored in
- * the Executor's memory, unlike the IndexShuffleBlockManager.
+ * the Executor's memory, unlike the IndexShuffleBlockResolver.
*/
-public class ExternalShuffleBlockManager {
- private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
+public class ExternalShuffleBlockResolver {
+ private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
// Map containing all registered executors' metadata.
private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
@@ -60,7 +60,7 @@ public class ExternalShuffleBlockManager {
private final TransportConf conf;
- public ExternalShuffleBlockManager(TransportConf conf) {
+ public ExternalShuffleBlockResolver(TransportConf conf) {
this(conf, Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
@@ -68,7 +68,7 @@ public class ExternalShuffleBlockManager {
// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
- ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
+ ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
@@ -168,7 +168,7 @@ public class ExternalShuffleBlockManager {
/**
* Hash-based shuffle data is simply stored as one file per block.
- * This logic is from FileShuffleBlockManager.
+ * This logic is from FileShuffleBlockResolver.
*/
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
@@ -178,7 +178,7 @@ public class ExternalShuffleBlockManager {
/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
- * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
+ * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
index 3f9fe1681c..73374cdc77 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite {
TransportClient client = mock(TransportClient.class);
OneForOneStreamManager streamManager;
- ExternalShuffleBlockManager blockManager;
+ ExternalShuffleBlockResolver blockResolver;
RpcHandler handler;
@Before
public void beforeEach() {
streamManager = mock(OneForOneStreamManager.class);
- blockManager = mock(ExternalShuffleBlockManager.class);
- handler = new ExternalShuffleBlockHandler(streamManager, blockManager);
+ blockResolver = mock(ExternalShuffleBlockResolver.class);
+ handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
}
@Test
@@ -62,7 +62,7 @@ public class ExternalShuffleBlockHandlerSuite {
ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray();
handler.receive(client, registerMessage, callback);
- verify(blockManager, times(1)).registerExecutor("app0", "exec1", config);
+ verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
verify(callback, times(1)).onSuccess((byte[]) any());
verify(callback, never()).onFailure((Throwable) any());
@@ -75,12 +75,12 @@ public class ExternalShuffleBlockHandlerSuite {
ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
- when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
- when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
+ when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
+ when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray();
handler.receive(client, openBlocks, callback);
- verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0");
- verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
+ verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class);
verify(callback, times(1)).onSuccess(response.capture());
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index dad6428a83..d02f4f0fdb 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -30,7 +30,7 @@ import org.junit.Test;
import static org.junit.Assert.*;
-public class ExternalShuffleBlockManagerSuite {
+public class ExternalShuffleBlockResolverSuite {
static String sortBlock0 = "Hello!";
static String sortBlock1 = "World!";
@@ -60,29 +60,29 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testBadRequests() {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
// Unregistered executor
try {
- manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
fail("Should have failed");
} catch (RuntimeException e) {
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
}
// Invalid shuffle manager
- manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
+ resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
try {
- manager.getBlockData("app0", "exec2", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
fail("Should have failed");
} catch (UnsupportedOperationException e) {
// pass
}
// Nonexistent shuffle block
- manager.registerExecutor("app0", "exec3",
+ resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
try {
- manager.getBlockData("app0", "exec3", "shuffle_1_1_0");
+ resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
fail("Should have failed");
} catch (Exception e) {
// pass
@@ -91,18 +91,18 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
- manager.registerExecutor("app0", "exec0",
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
InputStream block0Stream =
- manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
block0Stream.close();
assertEquals(sortBlock0, block0);
InputStream block1Stream =
- manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
block1Stream.close();
assertEquals(sortBlock1, block1);
@@ -110,18 +110,18 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
- manager.registerExecutor("app0", "exec0",
+ ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf);
+ resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
InputStream block0Stream =
- manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
block0Stream.close();
assertEquals(hashBlock0, block0);
InputStream block1Stream =
- manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
+ resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
block1Stream.close();
assertEquals(hashBlock1, block1);
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 254e3a7a32..d9d9c1bf2f 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -41,14 +41,15 @@ public class ExternalShuffleCleanupSuite {
public void noCleanupAndCleanup() throws IOException {
TestShuffleDataContext dataContext = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
- manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app", false /* cleanup */);
+ ExternalShuffleBlockResolver resolver =
+ new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
+ resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+ resolver.applicationRemoved("app", false /* cleanup */);
assertStillThere(dataContext);
- manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app", true /* cleanup */);
+ resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+ resolver.applicationRemoved("app", true /* cleanup */);
assertCleanedUp(dataContext);
}
@@ -64,7 +65,7 @@ public class ExternalShuffleCleanupSuite {
@Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
};
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor);
+ ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, noThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", true);
@@ -81,11 +82,12 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
+ ExternalShuffleBlockResolver resolver =
+ new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
- manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
- manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app", true);
+ resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+ resolver.applicationRemoved("app", true);
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
@@ -96,25 +98,26 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
+ ExternalShuffleBlockResolver resolver =
+ new ExternalShuffleBlockResolver(conf, sameThreadExecutor);
- manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
- manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+ resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+ resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
- manager.applicationRemoved("app-nonexistent", true);
+ resolver.applicationRemoved("app-nonexistent", true);
assertStillThere(dataContext0);
assertStillThere(dataContext1);
- manager.applicationRemoved("app-0", true);
+ resolver.applicationRemoved("app-0", true);
assertCleanedUp(dataContext0);
assertStillThere(dataContext1);
- manager.applicationRemoved("app-1", true);
+ resolver.applicationRemoved("app-1", true);
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
// Make sure it's not an error to cleanup multiple times
- manager.applicationRemoved("app-1", true);
+ resolver.applicationRemoved("app-1", true);
assertCleanedUp(dataContext0);
assertCleanedUp(dataContext1);
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 76639114df..3fdde054ab 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -29,7 +29,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
/**
* Manages some sort- and hash-based shuffle data, including the creation
- * and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}.
+ * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
*/
public class TestShuffleDataContext {
public final String[] localDirs;
@@ -61,9 +61,9 @@ public class TestShuffleDataContext {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
OutputStream dataStream = new FileOutputStream(
- ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
DataOutputStream indexStream = new DataOutputStream(new FileOutputStream(
- ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
long offset = 0;
indexStream.writeLong(offset);
@@ -82,7 +82,7 @@ public class TestShuffleDataContext {
for (int i = 0; i < blocks.length; i ++) {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i;
Files.write(blocks[i],
- ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId));
+ ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId));
}
}