From ef29a9a9aa85468869eb67ca67b66c65f508d0ee Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 11 Nov 2014 00:25:31 -0800 Subject: [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin Author: Reynold Xin Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. --- .../network/buffer/FileSegmentManagedBuffer.java | 23 ++--- .../spark/network/buffer/LazyFileRegion.java | 111 +++++++++++++++++++++ .../apache/spark/network/util/TransportConf.java | 17 ++++ .../spark/network/ChunkFetchIntegrationSuite.java | 9 +- .../shuffle/ExternalShuffleBlockHandler.java | 5 +- .../shuffle/ExternalShuffleBlockManager.java | 13 ++- .../shuffle/ExternalShuffleBlockManagerSuite.java | 10 +- .../shuffle/ExternalShuffleCleanupSuite.java | 13 ++- .../shuffle/ExternalShuffleIntegrationSuite.java | 2 +- .../shuffle/ExternalShuffleSecuritySuite.java | 2 +- .../spark/network/yarn/YarnShuffleService.java | 4 +- 11 files changed, 176 insertions(+), 33 deletions(-) create mode 100644 network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java (limited to 'network') diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 5fa1527ddf..844eff4f4c 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -31,24 +31,19 @@ import io.netty.channel.DefaultFileRegion; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.network.util.LimitedInputStream; +import org.apache.spark.network.util.TransportConf; /** * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - - /** - * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889). - * Avoid unless there's a good reason not to. - */ - // TODO: Make this configurable - private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024; - + private final TransportConf conf; private final File file; private final long offset; private final long length; - public FileSegmentManagedBuffer(File file, long offset, long length) { + public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { + this.conf = conf; this.file = file; this.offset = offset; this.length = length; @@ -65,7 +60,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { try { channel = new RandomAccessFile(file, "r").getChannel(); // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead. - if (length < MIN_MEMORY_MAP_BYTES) { + if (length < conf.memoryMapBytes()) { ByteBuffer buf = ByteBuffer.allocate((int) length); channel.position(offset); while (buf.remaining() != 0) { @@ -134,8 +129,12 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { @Override public Object convertToNetty() throws IOException { - FileChannel fileChannel = new FileInputStream(file).getChannel(); - return new DefaultFileRegion(fileChannel, offset, length); + if (conf.lazyFileDescriptor()) { + return new LazyFileRegion(file, offset, length); + } else { + FileChannel fileChannel = new FileInputStream(file).getChannel(); + return new DefaultFileRegion(fileChannel, offset, length); + } } public File getFile() { return file; } diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java new file mode 100644 index 0000000000..81bc8ec40f --- /dev/null +++ b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.buffer; + +import java.io.FileInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; + +import com.google.common.base.Objects; +import io.netty.channel.FileRegion; +import io.netty.util.AbstractReferenceCounted; + +import org.apache.spark.network.util.JavaUtils; + +/** + * A FileRegion implementation that only creates the file descriptor when the region is being + * transferred. This cannot be used with Epoll because there is no native support for it. + * + * This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we + * should push this into Netty so the native Epoll transport can support this feature. + */ +public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion { + + private final File file; + private final long position; + private final long count; + + private FileChannel channel; + + private long numBytesTransferred = 0L; + + /** + * @param file file to transfer. + * @param position start position for the transfer. + * @param count number of bytes to transfer starting from position. + */ + public LazyFileRegion(File file, long position, long count) { + this.file = file; + this.position = position; + this.count = count; + } + + @Override + protected void deallocate() { + JavaUtils.closeQuietly(channel); + } + + @Override + public long position() { + return position; + } + + @Override + public long transfered() { + return numBytesTransferred; + } + + @Override + public long count() { + return count; + } + + @Override + public long transferTo(WritableByteChannel target, long position) throws IOException { + if (channel == null) { + channel = new FileInputStream(file).getChannel(); + } + + long count = this.count - position; + if (count < 0 || position < 0) { + throw new IllegalArgumentException( + "position out of range: " + position + " (expected: 0 - " + (count - 1) + ')'); + } + + if (count == 0) { + return 0L; + } + + long written = channel.transferTo(this.position + position, count, target); + if (written > 0) { + numBytesTransferred += written; + } + return written; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("file", file) + .add("position", position) + .add("count", count) + .toString(); + } +} diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 787a8f0031..621427d8cb 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -75,4 +75,21 @@ public class TransportConf { * Only relevant if maxIORetries > 0. */ public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); } + + /** + * Minimum size of a block that we should start using memory map rather than reading in through + * normal IO operations. This prevents Spark from memory mapping very small blocks. In general, + * memory mapping has high overhead for blocks close to or below the page size of the OS. + */ + public int memoryMapBytes() { + return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024); + } + + /** + * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are + * created only when data is going to be transferred. This can reduce the number of open files. + */ + public boolean lazyFileDescriptor() { + return conf.getBoolean("spark.shuffle.io.lazyFD", true); + } } diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java index c415883397..dfb7740344 100644 --- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java +++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java @@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite { static ManagedBuffer bufferChunk; static ManagedBuffer fileChunk; + private TransportConf transportConf; + @BeforeClass public static void setUp() throws Exception { int bufSize = 100000; @@ -80,9 +82,10 @@ public class ChunkFetchIntegrationSuite { new Random().nextBytes(fileContent); fp.write(fileContent); fp.close(); - fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25); - TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); + final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); + fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25); + streamManager = new StreamManager() { @Override public ManagedBuffer getChunk(long streamId, int chunkIndex) { @@ -90,7 +93,7 @@ public class ChunkFetchIntegrationSuite { if (chunkIndex == BUFFER_CHUNK_INDEX) { return new NioManagedBuffer(buf); } else if (chunkIndex == FILE_CHUNK_INDEX) { - return new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25); + return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25); } else { throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex); } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index a6db4b2abd..46ca970862 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -21,6 +21,7 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import org.apache.spark.network.util.TransportConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler { private final ExternalShuffleBlockManager blockManager; private final OneForOneStreamManager streamManager; - public ExternalShuffleBlockHandler() { - this(new OneForOneStreamManager(), new ExternalShuffleBlockManager()); + public ExternalShuffleBlockHandler(TransportConf conf) { + this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf)); } /** Enables mocking out the StreamManager and BlockManager. */ diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java index ffb7faa3db..dfe0ba0595 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java @@ -37,6 +37,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.util.JavaUtils; +import org.apache.spark.network.util.TransportConf; /** * Manages converting shuffle BlockIds into physical segments of local files, from a process outside @@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager { // Single-threaded Java executor used to perform expensive recursive directory deletion. private final Executor directoryCleaner; - public ExternalShuffleBlockManager() { + private final TransportConf conf; + + public ExternalShuffleBlockManager(TransportConf conf) { // TODO: Give this thread a name. - this(Executors.newSingleThreadExecutor()); + this(conf, Executors.newSingleThreadExecutor()); } // Allows tests to have more control over when directories are cleaned up. @VisibleForTesting - ExternalShuffleBlockManager(Executor directoryCleaner) { + ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) { + this.conf = conf; this.executors = Maps.newConcurrentMap(); this.directoryCleaner = directoryCleaner; } @@ -167,7 +171,7 @@ public class ExternalShuffleBlockManager { // TODO: Support consolidated hash shuffle files private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId); - return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length()); + return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length()); } /** @@ -187,6 +191,7 @@ public class ExternalShuffleBlockManager { long offset = in.readLong(); long nextOffset = in.readLong(); return new FileSegmentManagedBuffer( + conf, getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), offset, diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java index da54797e89..dad6428a83 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java @@ -22,6 +22,8 @@ import java.io.InputStream; import java.io.InputStreamReader; import com.google.common.io.CharStreams; +import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.TransportConf; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite { static TestShuffleDataContext dataContext; + static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); + @BeforeClass public static void beforeAll() throws IOException { dataContext = new TestShuffleDataContext(2, 5); @@ -56,7 +60,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testBadRequests() { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); // Unregistered executor try { manager.getBlockData("app0", "exec1", "shuffle_1_1_0"); @@ -87,7 +91,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testSortShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); manager.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); @@ -106,7 +110,7 @@ public class ExternalShuffleBlockManagerSuite { @Test public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); manager.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index c8ece3bc53..254e3a7a32 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -25,20 +25,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.util.concurrent.MoreExecutors; import org.junit.Test; - import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.spark.network.util.SystemPropertyConfigProvider; +import org.apache.spark.network.util.TransportConf; + public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); + TransportConf conf = new TransportConf(new SystemPropertyConfigProvider()); @Test public void noCleanupAndCleanup() throws IOException { TestShuffleDataContext dataContext = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", false /* cleanup */); @@ -61,7 +64,7 @@ public class ExternalShuffleCleanupSuite { @Override public void execute(Runnable runnable) { cleanupCalled.set(true); } }; - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", true); @@ -78,7 +81,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); @@ -93,7 +96,7 @@ public class ExternalShuffleCleanupSuite { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor); + ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 687bde59fd..02c10bcb7b 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite { dataContext1.insertHashShuffleData(1, 0, exec1Blocks); conf = new TransportConf(new SystemPropertyConfigProvider()); - handler = new ExternalShuffleBlockHandler(); + handler = new ExternalShuffleBlockHandler(conf); TransportContext transportContext = new TransportContext(conf, handler); server = transportContext.createServer(); } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index 8afceab1d5..759a12910c 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -42,7 +42,7 @@ public class ExternalShuffleSecuritySuite { @Before public void beforeEach() { - RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(), + RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(conf), new TestSecretKeyHolder("my-app-id", "secret")); TransportContext context = new TransportContext(conf, handler); this.server = context.createServer(); diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index bb0b8f7e6c..a34aabe9e7 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -95,10 +95,11 @@ public class YarnShuffleService extends AuxiliaryService { */ @Override protected void serviceInit(Configuration conf) { + TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); - RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); + RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf); if (authEnabled) { secretManager = new ShuffleSecretManager(); rpcHandler = new SaslRpcHandler(rpcHandler, secretManager); @@ -106,7 +107,6 @@ public class YarnShuffleService extends AuxiliaryService { int port = conf.getInt( SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); TransportContext transportContext = new TransportContext(transportConf, rpcHandler); shuffleServer = transportContext.createServer(port); String authEnabledString = authEnabled ? "enabled" : "not enabled"; -- cgit v1.2.3