From 6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Mon, 25 Apr 2016 12:33:32 -0700 Subject: [SPARK-14731][shuffle]Revert SPARK-12130 to make 2.0 shuffle service compatible with 1.x ## What changes were proposed in this pull request? SPARK-12130 make 2.0 shuffle service incompatible with 1.x. So from discussion: [http://apache-spark-developers-list.1001551.n3.nabble.com/YARN-Shuffle-service-and-its-compatibility-td17222.html](url) we should maintain compatibility between Spark 1.x and Spark 2.x's shuffle service. I put string comparison into executor's register at first avoid string comparison in getBlockData every time. ## How was this patch tested? N/A Author: Lianhui Wang Closes #12568 from lianhuiwang/SPARK-14731. --- .../network/shuffle/ExternalShuffleBlockResolver.java | 15 +++++++++------ .../apache/spark/network/sasl/SaslIntegrationSuite.java | 3 ++- .../shuffle/ExternalShuffleBlockResolverSuite.java | 11 ++++++----- .../network/shuffle/ExternalShuffleCleanupSuite.java | 15 ++++++++------- .../network/shuffle/ExternalShuffleIntegrationSuite.java | 11 ++++------- .../network/shuffle/ExternalShuffleSecuritySuite.java | 3 ++- 6 files changed, 31 insertions(+), 27 deletions(-) (limited to 'common/network-shuffle/src') diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 3071201266..54e870a9b5 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -76,6 +76,10 @@ public class ExternalShuffleBlockResolver { @VisibleForTesting final DB db; + private final List knownManagers = Arrays.asList( + "org.apache.spark.shuffle.sort.SortShuffleManager", + "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager"); + public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException { this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor( @@ -149,6 +153,10 @@ public class ExternalShuffleBlockResolver { ExecutorShuffleInfo executorInfo) { AppExecId fullId = new AppExecId(appId, execId); logger.info("Registered executor {} with {}", fullId, executorInfo); + if (!knownManagers.contains(executorInfo.shuffleManager)) { + throw new UnsupportedOperationException( + "Unsupported shuffle manager of executor: " + executorInfo); + } try { if (db != null) { byte[] key = dbAppExecKey(fullId); @@ -183,12 +191,7 @@ public class ExternalShuffleBlockResolver { String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } - if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) { - return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); - } else { - throw new UnsupportedOperationException( - "Unsupported shuffle manager: " + executor.shuffleManager); - } + return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); } /** diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 5bf9924185..6ba937dddb 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -220,7 +220,8 @@ public class SaslIntegrationSuite { // Register an executor so that the next steps work. ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo( - new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort"); + new String[] { System.getProperty("java.io.tmpdir") }, 1, + "org.apache.spark.shuffle.sort.SortShuffleManager"); RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo); client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index de4840a588..35d6346474 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -37,6 +37,7 @@ import static org.junit.Assert.*; public class ExternalShuffleBlockResolverSuite { private static final String sortBlock0 = "Hello!"; private static final String sortBlock1 = "World!"; + private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; private static TestShuffleDataContext dataContext; @@ -71,8 +72,8 @@ public class ExternalShuffleBlockResolverSuite { } // Invalid shuffle manager - resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); try { + resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); resolver.getBlockData("app0", "exec2", "shuffle_1_1_0"); fail("Should have failed"); } catch (UnsupportedOperationException e) { @@ -81,7 +82,7 @@ public class ExternalShuffleBlockResolverSuite { // Nonexistent shuffle block resolver.registerExecutor("app0", "exec3", - dataContext.createExecutorInfo("sort")); + dataContext.createExecutorInfo(SORT_MANAGER)); try { resolver.getBlockData("app0", "exec3", "shuffle_1_1_0"); fail("Should have failed"); @@ -94,7 +95,7 @@ public class ExternalShuffleBlockResolverSuite { public void testSortShuffleBlocks() throws IOException { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); resolver.registerExecutor("app0", "exec0", - dataContext.createExecutorInfo("sort")); + dataContext.createExecutorInfo(SORT_MANAGER)); InputStream block0Stream = resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); @@ -120,7 +121,7 @@ public class ExternalShuffleBlockResolverSuite { assertEquals(parsedAppId, appId); ExecutorShuffleInfo shuffleInfo = - new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort"); + new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, SORT_MANAGER); String shuffleJson = mapper.writeValueAsString(shuffleInfo); ExecutorShuffleInfo parsedShuffleInfo = mapper.readValue(shuffleJson, ExecutorShuffleInfo.class); @@ -131,7 +132,7 @@ public class ExternalShuffleBlockResolverSuite { String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}"; assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class)); String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " + - "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}"; + "\"subDirsPerLocalDir\": 7, \"shuffleManager\": " + "\"" + SORT_MANAGER + "\"}"; assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class)); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index fa5cd1398a..bdd218db69 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -37,6 +37,7 @@ public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; @Test public void noCleanupAndCleanup() throws IOException { @@ -44,12 +45,12 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); - resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app", false /* cleanup */); assertStillThere(dataContext); - resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app", true /* cleanup */); assertCleanedUp(dataContext); @@ -69,7 +70,7 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, null, noThreadExecutor); - manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); + manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); manager.applicationRemoved("app", true); assertTrue(cleanupCalled.get()); @@ -87,8 +88,8 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); - resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); - resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER)); + resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app", true); assertCleanedUp(dataContext0); @@ -103,8 +104,8 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); - resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); - resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER)); + resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app-nonexistent", true); assertStillThere(dataContext0); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 067c815c30..552b5366c5 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -48,8 +48,8 @@ import org.apache.spark.network.util.TransportConf; public class ExternalShuffleIntegrationSuite { - static String APP_ID = "app-id"; - static String SORT_MANAGER = "sort"; + private static final String APP_ID = "app-id"; + private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; // Executor 0 is sort-based static TestShuffleDataContext dataContext0; @@ -184,12 +184,9 @@ public class ExternalShuffleIntegrationSuite { exec0Fetch.releaseBuffers(); } - @Test - public void testFetchInvalidShuffle() throws Exception { + @Test (expected = RuntimeException.class) + public void testRegisterInvalidExecutor() throws Exception { registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager")); - FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" }); - assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks); } @Test diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index acc1168f83..a0f69ca29a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -93,7 +93,8 @@ public class ExternalShuffleSecuritySuite { client.init(appId); // Registration either succeeds or throws an exception. client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", - new ExecutorShuffleInfo(new String[0], 0, "")); + new ExecutorShuffleInfo(new String[0], 0, + "org.apache.spark.shuffle.sort.SortShuffleManager")); client.close(); } -- cgit v1.2.3