From 5e92583d38e11d39deb429a39725443111205a4a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 18 Apr 2016 19:30:00 -0700 Subject: [SPARK-14667] Remove HashShuffleManager ## What changes were proposed in this pull request? The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager. ## How was this patch tested? Removed some tests related to the old manager. Author: Reynold Xin Closes #12423 from rxin/SPARK-14667. --- .../shuffle/ExternalShuffleBlockResolver.java | 13 +------- .../shuffle/protocol/ExecutorShuffleInfo.java | 2 +- .../shuffle/ExternalShuffleBlockResolverSuite.java | 33 ++----------------- .../shuffle/ExternalShuffleCleanupSuite.java | 3 -- .../shuffle/ExternalShuffleIntegrationSuite.java | 38 +++------------------- .../network/shuffle/TestShuffleDataContext.java | 11 +------ 6 files changed, 10 insertions(+), 90 deletions(-) (limited to 'common/network-shuffle/src') diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index ce5c68e853..3071201266 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -49,7 +49,7 @@ import org.apache.spark.network.util.TransportConf; * Manages converting shuffle BlockIds into physical segments of local files, from a process outside * of Executors. Each Executor must register its own configuration about where it stores its files * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated - * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver. + * from Spark's IndexShuffleBlockResolver. */ public class ExternalShuffleBlockResolver { private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class); @@ -185,8 +185,6 @@ public class ExternalShuffleBlockResolver { if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) { return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); - } else if ("hash".equals(executor.shuffleManager)) { - return getHashBasedShuffleBlockData(executor, blockId); } else { throw new UnsupportedOperationException( "Unsupported shuffle manager: " + executor.shuffleManager); @@ -250,15 +248,6 @@ public class ExternalShuffleBlockResolver { } } - /** - * Hash-based shuffle data is simply stored as one file per block. - * This logic is from FileShuffleBlockResolver. - */ - private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { - File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); - return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length()); - } - /** * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver, diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java index 102d4efb8b..93758bdc58 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java @@ -33,7 +33,7 @@ public class ExecutorShuffleInfo implements Encodable { public final String[] localDirs; /** Number of subdirectories created within each localDir. */ public final int subDirsPerLocalDir; - /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */ + /** Shuffle manager (SortShuffleManager) that the executor is using. */ public final String shuffleManager; @JsonCreator diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index d9b5f0261a..de4840a588 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -38,9 +38,6 @@ public class ExternalShuffleBlockResolverSuite { private static final String sortBlock0 = "Hello!"; private static final String sortBlock1 = "World!"; - private static final String hashBlock0 = "Elementary"; - private static final String hashBlock1 = "Tabular"; - private static TestShuffleDataContext dataContext; private static final TransportConf conf = @@ -51,13 +48,10 @@ public class ExternalShuffleBlockResolverSuite { dataContext = new TestShuffleDataContext(2, 5); dataContext.create(); - // Write some sort and hash data. + // Write some sort data. dataContext.insertSortShuffleData(0, 0, new byte[][] { sortBlock0.getBytes(StandardCharsets.UTF_8), sortBlock1.getBytes(StandardCharsets.UTF_8)}); - dataContext.insertHashShuffleData(1, 0, new byte[][] { - hashBlock0.getBytes(StandardCharsets.UTF_8), - hashBlock1.getBytes(StandardCharsets.UTF_8)}); } @AfterClass @@ -117,27 +111,6 @@ public class ExternalShuffleBlockResolverSuite { assertEquals(sortBlock1, block1); } - @Test - public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); - resolver.registerExecutor("app0", "exec0", - dataContext.createExecutorInfo("hash")); - - InputStream block0Stream = - resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); - String block0 = CharStreams.toString( - new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); - block0Stream.close(); - assertEquals(hashBlock0, block0); - - InputStream block1Stream = - resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); - String block1 = CharStreams.toString( - new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); - block1Stream.close(); - assertEquals(hashBlock1, block1); - } - @Test public void jsonSerializationOfExecutorRegistration() throws IOException { ObjectMapper mapper = new ObjectMapper(); @@ -147,7 +120,7 @@ public class ExternalShuffleBlockResolverSuite { assertEquals(parsedAppId, appId); ExecutorShuffleInfo shuffleInfo = - new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash"); + new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort"); String shuffleJson = mapper.writeValueAsString(shuffleInfo); ExecutorShuffleInfo parsedShuffleInfo = mapper.readValue(shuffleJson, ExecutorShuffleInfo.class); @@ -158,7 +131,7 @@ public class ExternalShuffleBlockResolverSuite { String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}"; assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class)); String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " + - "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}"; + "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}"; assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class)); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 43d0201405..fa5cd1398a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -144,9 +144,6 @@ public class ExternalShuffleCleanupSuite { dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] { "ABC".getBytes(StandardCharsets.UTF_8), "DEF".getBytes(StandardCharsets.UTF_8)}); - dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] { - "GHI".getBytes(StandardCharsets.UTF_8), - "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)}); return dataContext; } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index ecbbe7bfa3..067c815c30 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -50,12 +50,9 @@ public class ExternalShuffleIntegrationSuite { static String APP_ID = "app-id"; static String SORT_MANAGER = "sort"; - static String HASH_MANAGER = "hash"; // Executor 0 is sort-based static TestShuffleDataContext dataContext0; - // Executor 1 is hash-based - static TestShuffleDataContext dataContext1; static ExternalShuffleBlockHandler handler; static TransportServer server; @@ -87,10 +84,6 @@ public class ExternalShuffleIntegrationSuite { dataContext0.create(); dataContext0.insertSortShuffleData(0, 0, exec0Blocks); - dataContext1 = new TestShuffleDataContext(6, 2); - dataContext1.create(); - dataContext1.insertHashShuffleData(1, 0, exec1Blocks); - conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); handler = new ExternalShuffleBlockHandler(conf, null); TransportContext transportContext = new TransportContext(conf, handler); @@ -100,7 +93,6 @@ public class ExternalShuffleIntegrationSuite { @AfterClass public static void afterAll() { dataContext0.cleanup(); - dataContext1.cleanup(); server.close(); } @@ -192,40 +184,18 @@ public class ExternalShuffleIntegrationSuite { exec0Fetch.releaseBuffers(); } - @Test - public void testFetchHash() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER)); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0", "shuffle_1_0_1" }); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks); - assertTrue(execFetch.failedBlocks.isEmpty()); - assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks)); - execFetch.releaseBuffers(); - } - - @Test - public void testFetchWrongShuffle() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */)); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0", "shuffle_1_0_1" }); - assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks); - } - @Test public void testFetchInvalidShuffle() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager")); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "shuffle_1_0_0" }); + registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager")); + FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" }); assertTrue(execFetch.successBlocks.isEmpty()); assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks); } @Test public void testFetchWrongBlockId() throws Exception { - registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */)); - FetchResult execFetch = fetchBlocks("exec-1", - new String[] { "rdd_1_0_0" }); + registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER)); + FetchResult execFetch = fetchBlocks("exec-1", new String[] { "rdd_1_0_0" }); assertTrue(execFetch.successBlocks.isEmpty()); assertEquals(Sets.newHashSet("rdd_1_0_0"), execFetch.failedBlocks); } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 7ac1ca128a..62a1fb42b0 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -29,7 +29,7 @@ import com.google.common.io.Files; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; /** - * Manages some sort- and hash-based shuffle data, including the creation + * Manages some sort-shuffle data, including the creation * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}. */ public class TestShuffleDataContext { @@ -85,15 +85,6 @@ public class TestShuffleDataContext { } } - /** Creates reducer blocks in a hash-based data format within our local dirs. */ - public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException { - for (int i = 0; i < blocks.length; i ++) { - String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i; - Files.write(blocks[i], - ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId)); - } - } - /** * Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this * context's directories. -- cgit v1.2.3