aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-18 19:30:00 -0700
committerReynold Xin <rxin@databricks.com>2016-04-18 19:30:00 -0700
commit5e92583d38e11d39deb429a39725443111205a4a (patch)
tree3ca0408257968f37a7e5e4d0c35ed05f449c145d /common
parent4b3d1294aeecc0001a7fa48c92796e6075d34540 (diff)
downloadspark-5e92583d38e11d39deb429a39725443111205a4a.tar.gz
spark-5e92583d38e11d39deb429a39725443111205a4a.tar.bz2
spark-5e92583d38e11d39deb429a39725443111205a4a.zip
[SPARK-14667] Remove HashShuffleManager
## What changes were proposed in this pull request? The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager. ## How was this patch tested? Removed some tests related to the old manager. Author: Reynold Xin <rxin@databricks.com> Closes #12423 from rxin/SPARK-14667.
Diffstat (limited to 'common')
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java13
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java2
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java33
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java3
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java38
-rw-r--r--common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java11
6 files changed, 10 insertions, 90 deletions
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index ce5c68e853..3071201266 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -49,7 +49,7 @@ import org.apache.spark.network.util.TransportConf;
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
* of Executors. Each Executor must register its own configuration about where it stores its files
* (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
- * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
+ * from Spark's IndexShuffleBlockResolver.
*/
public class ExternalShuffleBlockResolver {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
@@ -185,8 +185,6 @@ public class ExternalShuffleBlockResolver {
if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
- } else if ("hash".equals(executor.shuffleManager)) {
- return getHashBasedShuffleBlockData(executor, blockId);
} else {
throw new UnsupportedOperationException(
"Unsupported shuffle manager: " + executor.shuffleManager);
@@ -251,15 +249,6 @@ public class ExternalShuffleBlockResolver {
}
/**
- * Hash-based shuffle data is simply stored as one file per block.
- * This logic is from FileShuffleBlockResolver.
- */
- private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
- File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
- return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
- }
-
- /**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
index 102d4efb8b..93758bdc58 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
@@ -33,7 +33,7 @@ public class ExecutorShuffleInfo implements Encodable {
public final String[] localDirs;
/** Number of subdirectories created within each localDir. */
public final int subDirsPerLocalDir;
- /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
+ /** Shuffle manager (SortShuffleManager) that the executor is using. */
public final String shuffleManager;
@JsonCreator
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index d9b5f0261a..de4840a588 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -38,9 +38,6 @@ public class ExternalShuffleBlockResolverSuite {
private static final String sortBlock0 = "Hello!";
private static final String sortBlock1 = "World!";
- private static final String hashBlock0 = "Elementary";
- private static final String hashBlock1 = "Tabular";
-
private static TestShuffleDataContext dataContext;
private static final TransportConf conf =
@@ -51,13 +48,10 @@ public class ExternalShuffleBlockResolverSuite {
dataContext = new TestShuffleDataContext(2, 5);
dataContext.create();
- // Write some sort and hash data.
+ // Write some sort data.
dataContext.insertSortShuffleData(0, 0, new byte[][] {
sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)});
- dataContext.insertHashShuffleData(1, 0, new byte[][] {
- hashBlock0.getBytes(StandardCharsets.UTF_8),
- hashBlock1.getBytes(StandardCharsets.UTF_8)});
}
@AfterClass
@@ -118,27 +112,6 @@ public class ExternalShuffleBlockResolverSuite {
}
@Test
- public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
- resolver.registerExecutor("app0", "exec0",
- dataContext.createExecutorInfo("hash"));
-
- InputStream block0Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
- String block0 = CharStreams.toString(
- new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
- block0Stream.close();
- assertEquals(hashBlock0, block0);
-
- InputStream block1Stream =
- resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
- String block1 = CharStreams.toString(
- new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
- block1Stream.close();
- assertEquals(hashBlock1, block1);
- }
-
- @Test
public void jsonSerializationOfExecutorRegistration() throws IOException {
ObjectMapper mapper = new ObjectMapper();
AppExecId appId = new AppExecId("foo", "bar");
@@ -147,7 +120,7 @@ public class ExternalShuffleBlockResolverSuite {
assertEquals(parsedAppId, appId);
ExecutorShuffleInfo shuffleInfo =
- new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
+ new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort");
String shuffleJson = mapper.writeValueAsString(shuffleInfo);
ExecutorShuffleInfo parsedShuffleInfo =
mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
@@ -158,7 +131,7 @@ public class ExternalShuffleBlockResolverSuite {
String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
- "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
+ "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}";
assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
}
}
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 43d0201405..fa5cd1398a 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -144,9 +144,6 @@ public class ExternalShuffleCleanupSuite {
dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
"ABC".getBytes(StandardCharsets.UTF_8),
"DEF".getBytes(StandardCharsets.UTF_8)});
- dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] {
- "GHI".getBytes(StandardCharsets.UTF_8),
- "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)});
return dataContext;
}
}
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index ecbbe7bfa3..067c815c30 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -50,12 +50,9 @@ public class ExternalShuffleIntegrationSuite {
static String APP_ID = "app-id";
static String SORT_MANAGER = "sort";
- static String HASH_MANAGER = "hash";
// Executor 0 is sort-based
static TestShuffleDataContext dataContext0;
- // Executor 1 is hash-based
- static TestShuffleDataContext dataContext1;
static ExternalShuffleBlockHandler handler;
static TransportServer server;
@@ -87,10 +84,6 @@ public class ExternalShuffleIntegrationSuite {
dataContext0.create();
dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
- dataContext1 = new TestShuffleDataContext(6, 2);
- dataContext1.create();
- dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
-
conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
handler = new ExternalShuffleBlockHandler(conf, null);
TransportContext transportContext = new TransportContext(conf, handler);
@@ -100,7 +93,6 @@ public class ExternalShuffleIntegrationSuite {
@AfterClass
public static void afterAll() {
dataContext0.cleanup();
- dataContext1.cleanup();
server.close();
}
@@ -193,39 +185,17 @@ public class ExternalShuffleIntegrationSuite {
}
@Test
- public void testFetchHash() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
- assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks);
- assertTrue(execFetch.failedBlocks.isEmpty());
- assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks));
- execFetch.releaseBuffers();
- }
-
- @Test
- public void testFetchWrongShuffle() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
- assertTrue(execFetch.successBlocks.isEmpty());
- assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
- }
-
- @Test
public void testFetchInvalidShuffle() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager"));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "shuffle_1_0_0" });
+ registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
+ FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" });
assertTrue(execFetch.successBlocks.isEmpty());
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
}
@Test
public void testFetchWrongBlockId() throws Exception {
- registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
- FetchResult execFetch = fetchBlocks("exec-1",
- new String[] { "rdd_1_0_0" });
+ registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
+ FetchResult execFetch = fetchBlocks("exec-1", new String[] { "rdd_1_0_0" });
assertTrue(execFetch.successBlocks.isEmpty());
assertEquals(Sets.newHashSet("rdd_1_0_0"), execFetch.failedBlocks);
}
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 7ac1ca128a..62a1fb42b0 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -29,7 +29,7 @@ import com.google.common.io.Files;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
/**
- * Manages some sort- and hash-based shuffle data, including the creation
+ * Manages some sort-shuffle data, including the creation
* and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
*/
public class TestShuffleDataContext {
@@ -85,15 +85,6 @@ public class TestShuffleDataContext {
}
}
- /** Creates reducer blocks in a hash-based data format within our local dirs. */
- public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
- for (int i = 0; i < blocks.length; i ++) {
- String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i;
- Files.write(blocks[i],
- ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId));
- }
- }
-
/**
* Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this
* context's directories.