aboutsummaryrefslogtreecommitdiff
path: root/network/common
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2014-11-11 00:25:31 -0800
committerAaron Davidson <aaron@databricks.com>2014-11-11 00:25:31 -0800
commitef29a9a9aa85468869eb67ca67b66c65f508d0ee (patch)
treee669d33eeba5033c22acd29a0c8d7690db61abfe /network/common
parent65083e93ddd552b7d3e4eb09f87c091ef2ae83a2 (diff)
downloadspark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.gz
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.bz2
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.zip
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin <rxin@databricks.com> Author: Reynold Xin <rxin@apache.org> Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Diffstat (limited to 'network/common')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java23
-rw-r--r--network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java111
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java17
-rw-r--r--network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java9
4 files changed, 145 insertions, 15 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 5fa1527ddf..844eff4f4c 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -31,24 +31,19 @@ import io.netty.channel.DefaultFileRegion;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.network.util.TransportConf;
/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {
-
- /**
- * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
- * Avoid unless there's a good reason not to.
- */
- // TODO: Make this configurable
- private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
-
+ private final TransportConf conf;
private final File file;
private final long offset;
private final long length;
- public FileSegmentManagedBuffer(File file, long offset, long length) {
+ public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
+ this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
@@ -65,7 +60,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
- if (length < MIN_MEMORY_MAP_BYTES) {
+ if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
@@ -134,8 +129,12 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
@Override
public Object convertToNetty() throws IOException {
- FileChannel fileChannel = new FileInputStream(file).getChannel();
- return new DefaultFileRegion(fileChannel, offset, length);
+ if (conf.lazyFileDescriptor()) {
+ return new LazyFileRegion(file, offset, length);
+ } else {
+ FileChannel fileChannel = new FileInputStream(file).getChannel();
+ return new DefaultFileRegion(fileChannel, offset, length);
+ }
}
public File getFile() { return file; }
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
new file mode 100644
index 0000000000..81bc8ec40f
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Objects;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * A FileRegion implementation that only creates the file descriptor when the region is being
+ * transferred. This cannot be used with Epoll because there is no native support for it.
+ *
+ * This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
+ * should push this into Netty so the native Epoll transport can support this feature.
+ */
+public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
+
+ private final File file;
+ private final long position;
+ private final long count;
+
+ private FileChannel channel;
+
+ private long numBytesTransferred = 0L;
+
+ /**
+ * @param file file to transfer.
+ * @param position start position for the transfer.
+ * @param count number of bytes to transfer starting from position.
+ */
+ public LazyFileRegion(File file, long position, long count) {
+ this.file = file;
+ this.position = position;
+ this.count = count;
+ }
+
+ @Override
+ protected void deallocate() {
+ JavaUtils.closeQuietly(channel);
+ }
+
+ @Override
+ public long position() {
+ return position;
+ }
+
+ @Override
+ public long transfered() {
+ return numBytesTransferred;
+ }
+
+ @Override
+ public long count() {
+ return count;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position) throws IOException {
+ if (channel == null) {
+ channel = new FileInputStream(file).getChannel();
+ }
+
+ long count = this.count - position;
+ if (count < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
+ }
+
+ if (count == 0) {
+ return 0L;
+ }
+
+ long written = channel.transferTo(this.position + position, count, target);
+ if (written > 0) {
+ numBytesTransferred += written;
+ }
+ return written;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("file", file)
+ .add("position", position)
+ .add("count", count)
+ .toString();
+ }
+}
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 787a8f0031..621427d8cb 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -75,4 +75,21 @@ public class TransportConf {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
+
+ /**
+ * Minimum size of a block that we should start using memory map rather than reading in through
+ * normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
+ * memory mapping has high overhead for blocks close to or below the page size of the OS.
+ */
+ public int memoryMapBytes() {
+ return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
+ }
+
+ /**
+ * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
+ * created only when data is going to be transferred. This can reduce the number of open files.
+ */
+ public boolean lazyFileDescriptor() {
+ return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+ }
}
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index c415883397..dfb7740344 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite {
static ManagedBuffer bufferChunk;
static ManagedBuffer fileChunk;
+ private TransportConf transportConf;
+
@BeforeClass
public static void setUp() throws Exception {
int bufSize = 100000;
@@ -80,9 +82,10 @@ public class ChunkFetchIntegrationSuite {
new Random().nextBytes(fileContent);
fp.write(fileContent);
fp.close();
- fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
+
streamManager = new StreamManager() {
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
@@ -90,7 +93,7 @@ public class ChunkFetchIntegrationSuite {
if (chunkIndex == BUFFER_CHUNK_INDEX) {
return new NioManagedBuffer(buf);
} else if (chunkIndex == FILE_CHUNK_INDEX) {
- return new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
+ return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
} else {
throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
}