From ef29a9a9aa85468869eb67ca67b66c65f508d0ee Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 11 Nov 2014 00:25:31 -0800 Subject: [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin Author: Reynold Xin Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. --- .../main/java/org/apache/spark/network/yarn/YarnShuffleService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'network/yarn') diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index bb0b8f7e6c..a34aabe9e7 100644 --- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -95,10 +95,11 @@ public class YarnShuffleService extends AuxiliaryService { */ @Override protected void serviceInit(Configuration conf) { + TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); - RpcHandler rpcHandler = new ExternalShuffleBlockHandler(); + RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf); if (authEnabled) { secretManager = new ShuffleSecretManager(); rpcHandler = new SaslRpcHandler(rpcHandler, secretManager); @@ -106,7 +107,6 @@ public class YarnShuffleService extends AuxiliaryService { int port = conf.getInt( SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf)); TransportContext transportContext = new TransportContext(transportConf, rpcHandler); shuffleServer = transportContext.createServer(port); String authEnabledString = authEnabled ? "enabled" : "not enabled"; -- cgit v1.2.3