diff options
Diffstat (limited to 'network/shuffle/src')
6 files changed, 29 insertions, 16 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index a6db4b2abd..46ca970862 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -21,6 +21,7 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.spark.network.util.TransportConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler { private final ExternalShuffleBlockManager blockManager; private final OneForOneStreamManager streamManager; - public ExternalShuffleBlockHandler() { - this(new OneForOneStreamManager(), new ExternalShuffleBlockManager()); + public ExternalShuffleBlockHandler(TransportConf conf) { + this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf)); } /** Enables mocking out the StreamManager and BlockManager. */ diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java index ffb7faa3db..dfe0ba0595 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java @@ -37,6 +37,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.TransportConf; /** * Manages converting shuffle BlockIds into physical segments of local files, from a process outside @@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager { // Single-threaded Java executor used to perform expensive recursive directory deletion. private final Executor directoryCleaner; - public ExternalShuffleBlockManager() { + private final TransportConf conf; + + public ExternalShuffleBlockManager(TransportConf conf) { // TODO: Give this thread a name. - this(Executors.newSingleThreadExecutor()); + this(conf, Executors.newSingleThreadExecutor()); } // Allows tests to have more control over when directories are cleaned up. @VisibleForTesting - ExternalShuffleBlockManager(Executor directoryCleaner) { + ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) { + this.conf = conf; this.executors = Maps.newConcurrentMap(); this.directoryCleaner = directoryCleaner; } @@ -167,7 +171,7 @@ public class ExternalShuffleBlockManager { // TODO: Support consolidated hash shuffle files private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); - return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length()); + return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length()); } /** @@ -187,6 +191,7 @@ public class ExternalShuffleBlockManager { long offset = in.readLong(); long nextOffset = in.readLong(); return new FileSegmentManagedBuffer( + conf, getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), offset, diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java index da54797e89..dad6428a83 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java @@ -22,6 +22,8 @@ import java.io.InputStream; import java.io.InputStreamReader; import com.google.common.io.CharStreams; +import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.TransportConf; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite { static TestShuffleDataContext dataContext; + static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); + @BeforeClass public static void beforeAll() throws IOException { dataContext = new TestShuffleDataContext(2, 5); @@ -56,7 +60,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testBadRequests() { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); // Unregistered executor try { manager.getBlockData("app0", "exec1", "shuffle_1_1_0"); @@ -87,7 +91,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testSortShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); manager.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); @@ -106,7 +110,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); manager.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index c8ece3bc53..254e3a7a32 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -25,20 +25,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; - import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.TransportConf; + public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); + TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); @Test public void noCleanupAndCleanup() throws IOException { TestShuffleDataContext dataContext = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", false /* cleanup */); @@ -61,7 +64,7 @@ public class ExternalShuffleCleanupSuite { @Override public void execute(Runnable runnable) { cleanupCalled.set(true); } }; - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", true); @@ -78,7 +81,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); @@ -93,7 +96,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 687bde59fd..02c10bcb7b 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite { dataContext1.insertHashShuffleData(1, 0, exec1Blocks); conf = new TransportConf(new SystemPropertyConfigProvider()); - handler = new ExternalShuffleBlockHandler(); + handler = new ExternalShuffleBlockHandler(conf); TransportContext transportContext = new TransportContext(conf, handler); server = transportContext.createServer(); } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index 8afceab1d5..759a12910c 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -42,7 +42,7 @@ public class ExternalShuffleSecuritySuite { @Before public void beforeEach() { - RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(), + RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(conf), new TestSecretKeyHolder("my-app-id", "secret")); TransportContext context = new TransportContext(conf, handler); this.server = context.createServer(); |