aboutsummaryrefslogtreecommitdiff
path: root/network/yarn
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2014-11-11 00:25:31 -0800
committerAaron Davidson <aaron@databricks.com>2014-11-11 00:25:31 -0800
commitef29a9a9aa85468869eb67ca67b66c65f508d0ee (patch)
treee669d33eeba5033c22acd29a0c8d7690db61abfe /network/yarn
parent65083e93ddd552b7d3e4eb09f87c091ef2ae83a2 (diff)
downloadspark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.gz
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.bz2
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.zip
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin <rxin@databricks.com> Author: Reynold Xin <rxin@apache.org> Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Diffstat (limited to 'network/yarn')
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java4
1 files changed, 2 insertions, 2 deletions
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index bb0b8f7e6c..a34aabe9e7 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -95,10 +95,11 @@ public class YarnShuffleService extends AuxiliaryService {
*/
@Override
protected void serviceInit(Configuration conf) {
+ TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
- RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
+ RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf);
if (authEnabled) {
secretManager = new ShuffleSecretManager();
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
@@ -106,7 +107,6 @@ public class YarnShuffleService extends AuxiliaryService {
int port = conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
- TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
shuffleServer = transportContext.createServer(port);
String authEnabledString = authEnabled ? "enabled" : "not enabled";