From ef29a9a9aa85468869eb67ca67b66c65f508d0ee Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 11 Nov 2014 00:25:31 -0800 Subject: [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin Author: Reynold Xin Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. --- .../spark/network/shuffle/ExternalShuffleBlockHandler.java | 5 +++-- .../spark/network/shuffle/ExternalShuffleBlockManager.java | 13 +++++++++---- .../network/shuffle/ExternalShuffleBlockManagerSuite.java | 10 +++++++--- .../spark/network/shuffle/ExternalShuffleCleanupSuite.java | 13 ++++++++----- .../network/shuffle/ExternalShuffleIntegrationSuite.java | 2 +- .../spark/network/shuffle/ExternalShuffleSecuritySuite.java | 2 +- 6 files changed, 29 insertions(+), 16 deletions(-) (limited to 'network/shuffle') diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index a6db4b2abd..46ca970862 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -21,6 +21,7 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.spark.network.util.TransportConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler { private final ExternalShuffleBlockManager blockManager; private final OneForOneStreamManager streamManager; - public ExternalShuffleBlockHandler() { - this(new OneForOneStreamManager(), new ExternalShuffleBlockManager()); + public ExternalShuffleBlockHandler(TransportConf conf) { + this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf)); } /** Enables mocking out the StreamManager and BlockManager. */ diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java index ffb7faa3db..dfe0ba0595 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java @@ -37,6 +37,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.TransportConf; /** * Manages converting shuffle BlockIds into physical segments of local files, from a process outside @@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager { // Single-threaded Java executor used to perform expensive recursive directory deletion. private final Executor directoryCleaner; - public ExternalShuffleBlockManager() { + private final TransportConf conf; + + public ExternalShuffleBlockManager(TransportConf conf) { // TODO: Give this thread a name. - this(Executors.newSingleThreadExecutor()); + this(conf, Executors.newSingleThreadExecutor()); } // Allows tests to have more control over when directories are cleaned up. @VisibleForTesting - ExternalShuffleBlockManager(Executor directoryCleaner) { + ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) { + this.conf = conf; this.executors = Maps.newConcurrentMap(); this.directoryCleaner = directoryCleaner; } @@ -167,7 +171,7 @@ public class ExternalShuffleBlockManager { // TODO: Support consolidated hash shuffle files private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); - return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length()); + return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length()); } /** @@ -187,6 +191,7 @@ public class ExternalShuffleBlockManager { long offset = in.readLong(); long nextOffset = in.readLong(); return new FileSegmentManagedBuffer( + conf, getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), offset, diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java index da54797e89..dad6428a83 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java @@ -22,6 +22,8 @@ import java.io.InputStream; import java.io.InputStreamReader; import com.google.common.io.CharStreams; +import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.TransportConf; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite { static TestShuffleDataContext dataContext; + static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); + @BeforeClass public static void beforeAll() throws IOException { dataContext = new TestShuffleDataContext(2, 5); @@ -56,7 +60,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testBadRequests() { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); // Unregistered executor try { manager.getBlockData("app0", "exec1", "shuffle_1_1_0"); @@ -87,7 +91,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testSortShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); manager.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); @@ -106,7 +110,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); manager.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index c8ece3bc53..254e3a7a32 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -25,20 +25,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; - import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.TransportConf; + public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); + TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); @Test public void noCleanupAndCleanup() throws IOException { TestShuffleDataContext dataContext = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", false /* cleanup */); @@ -61,7 +64,7 @@ public class ExternalShuffleCleanupSuite { @Override public void execute(Runnable runnable) { cleanupCalled.set(true); } }; - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", true); @@ -78,7 +81,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); @@ -93,7 +96,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 687bde59fd..02c10bcb7b 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite { dataContext1.insertHashShuffleData(1, 0, exec1Blocks); conf = new TransportConf(new SystemPropertyConfigProvider()); - handler = new ExternalShuffleBlockHandler(); + handler = new ExternalShuffleBlockHandler(conf); TransportContext transportContext = new TransportContext(conf, handler); server = transportContext.createServer(); } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index 8afceab1d5..759a12910c 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -42,7 +42,7 @@ public class ExternalShuffleSecuritySuite { @Before public void beforeEach() { - RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(), + RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(conf), new TestSecretKeyHolder("my-app-id", "secret")); TransportContext context = new TransportContext(conf, handler); this.server = context.createServer(); -- cgit v1.2.3