aboutsummaryrefslogtreecommitdiff
path: root/network/shuffle/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2014-11-11 00:25:31 -0800
committerAaron Davidson <aaron@databricks.com>2014-11-11 00:25:31 -0800
commitef29a9a9aa85468869eb67ca67b66c65f508d0ee (patch)
treee669d33eeba5033c22acd29a0c8d7690db61abfe /network/shuffle/src
parent65083e93ddd552b7d3e4eb09f87c091ef2ae83a2 (diff)
downloadspark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.gz
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.bz2
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.zip
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin <rxin@databricks.com> Author: Reynold Xin <rxin@apache.org> Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Diffstat (limited to 'network/shuffle/src')
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java5
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java13
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java10
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java13
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java2
6 files changed, 29 insertions, 16 deletions
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index a6db4b2abd..46ca970862 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -21,6 +21,7 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
private final ExternalShuffleBlockManager blockManager;
private final OneForOneStreamManager streamManager;
- public ExternalShuffleBlockHandler() {
- this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
+ public ExternalShuffleBlockHandler(TransportConf conf) {
+ this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
}
/** Enables mocking out the StreamManager and BlockManager. */
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index ffb7faa3db..dfe0ba0595 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -37,6 +37,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.TransportConf;
/**
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
@@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager {
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;
- public ExternalShuffleBlockManager() {
+ private final TransportConf conf;
+
+ public ExternalShuffleBlockManager(TransportConf conf) {
// TODO: Give this thread a name.
- this(Executors.newSingleThreadExecutor());
+ this(conf, Executors.newSingleThreadExecutor());
}
// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
- ExternalShuffleBlockManager(Executor directoryCleaner) {
+ ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
+ this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
}
@@ -167,7 +171,7 @@ public class ExternalShuffleBlockManager {
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
- return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
+ return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
}
/**
@@ -187,6 +191,7 @@ public class ExternalShuffleBlockManager {
long offset = in.readLong();
long nextOffset = in.readLong();
return new FileSegmentManagedBuffer(
+ conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
offset,
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
index da54797e89..dad6428a83 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import com.google.common.io.CharStreams;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite {
static TestShuffleDataContext dataContext;
+ static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+
@BeforeClass
public static void beforeAll() throws IOException {
dataContext = new TestShuffleDataContext(2, 5);
@@ -56,7 +60,7 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testBadRequests() {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
// Unregistered executor
try {
manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
@@ -87,7 +91,7 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
@@ -106,7 +110,7 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index c8ece3bc53..254e3a7a32 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -25,20 +25,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
-
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
public class ExternalShuffleCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+ TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
@Test
public void noCleanupAndCleanup() throws IOException {
TestShuffleDataContext dataContext = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", false /* cleanup */);
@@ -61,7 +64,7 @@ public class ExternalShuffleCleanupSuite {
@Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
};
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", true);
@@ -78,7 +81,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
@@ -93,7 +96,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 687bde59fd..02c10bcb7b 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite {
dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
conf = new TransportConf(new SystemPropertyConfigProvider());
- handler = new ExternalShuffleBlockHandler();
+ handler = new ExternalShuffleBlockHandler(conf);
TransportContext transportContext = new TransportContext(conf, handler);
server = transportContext.createServer();
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index 8afceab1d5..759a12910c 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleSecuritySuite {
@Before
public void beforeEach() {
- RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(),
+ RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(conf),
new TestSecretKeyHolder("my-app-id", "secret"));
TransportContext context = new TransportContext(conf, handler);
this.server = context.createServer();