aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2014-11-11 00:25:31 -0800
committerAaron Davidson <aaron@databricks.com>2014-11-11 00:25:31 -0800
commitef29a9a9aa85468869eb67ca67b66c65f508d0ee (patch)
treee669d33eeba5033c22acd29a0c8d7690db61abfe /network
parent65083e93ddd552b7d3e4eb09f87c091ef2ae83a2 (diff)
downloadspark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.gz
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.tar.bz2
spark-ef29a9a9aa85468869eb67ca67b66c65f508d0ee.zip
[SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin <rxin@databricks.com> Author: Reynold Xin <rxin@apache.org> Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java23
-rw-r--r--network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java111
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/TransportConf.java17
-rw-r--r--network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java9
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java5
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java13
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java10
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java13
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java2
-rw-r--r--network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java2
-rw-r--r--network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java4
11 files changed, 176 insertions, 33 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 5fa1527ddf..844eff4f4c 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -31,24 +31,19 @@ import io.netty.channel.DefaultFileRegion;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.network.util.TransportConf;
/**
* A {@link ManagedBuffer} backed by a segment in a file.
*/
public final class FileSegmentManagedBuffer extends ManagedBuffer {
-
- /**
- * Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
- * Avoid unless there's a good reason not to.
- */
- // TODO: Make this configurable
- private static final long MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;
-
+ private final TransportConf conf;
private final File file;
private final long offset;
private final long length;
- public FileSegmentManagedBuffer(File file, long offset, long length) {
+ public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) {
+ this.conf = conf;
this.file = file;
this.offset = offset;
this.length = length;
@@ -65,7 +60,7 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
try {
channel = new RandomAccessFile(file, "r").getChannel();
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
- if (length < MIN_MEMORY_MAP_BYTES) {
+ if (length < conf.memoryMapBytes()) {
ByteBuffer buf = ByteBuffer.allocate((int) length);
channel.position(offset);
while (buf.remaining() != 0) {
@@ -134,8 +129,12 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
@Override
public Object convertToNetty() throws IOException {
- FileChannel fileChannel = new FileInputStream(file).getChannel();
- return new DefaultFileRegion(fileChannel, offset, length);
+ if (conf.lazyFileDescriptor()) {
+ return new LazyFileRegion(file, offset, length);
+ } else {
+ FileChannel fileChannel = new FileInputStream(file).getChannel();
+ return new DefaultFileRegion(fileChannel, offset, length);
+ }
}
public File getFile() { return file; }
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
new file mode 100644
index 0000000000..81bc8ec40f
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/LazyFileRegion.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.buffer;
+
+import java.io.FileInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Objects;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import org.apache.spark.network.util.JavaUtils;
+
+/**
+ * A FileRegion implementation that only creates the file descriptor when the region is being
+ * transferred. This cannot be used with Epoll because there is no native support for it.
+ *
+ * This is mostly copied from DefaultFileRegion implementation in Netty. In the future, we
+ * should push this into Netty so the native Epoll transport can support this feature.
+ */
+public final class LazyFileRegion extends AbstractReferenceCounted implements FileRegion {
+
+ private final File file;
+ private final long position;
+ private final long count;
+
+ private FileChannel channel;
+
+ private long numBytesTransferred = 0L;
+
+ /**
+ * @param file file to transfer.
+ * @param position start position for the transfer.
+ * @param count number of bytes to transfer starting from position.
+ */
+ public LazyFileRegion(File file, long position, long count) {
+ this.file = file;
+ this.position = position;
+ this.count = count;
+ }
+
+ @Override
+ protected void deallocate() {
+ JavaUtils.closeQuietly(channel);
+ }
+
+ @Override
+ public long position() {
+ return position;
+ }
+
+ @Override
+ public long transfered() {
+ return numBytesTransferred;
+ }
+
+ @Override
+ public long count() {
+ return count;
+ }
+
+ @Override
+ public long transferTo(WritableByteChannel target, long position) throws IOException {
+ if (channel == null) {
+ channel = new FileInputStream(file).getChannel();
+ }
+
+ long count = this.count - position;
+ if (count < 0 || position < 0) {
+ throw new IllegalArgumentException(
+ "position out of range: " + position + " (expected: 0 - " + (count - 1) + ')');
+ }
+
+ if (count == 0) {
+ return 0L;
+ }
+
+ long written = channel.transferTo(this.position + position, count, target);
+ if (written > 0) {
+ numBytesTransferred += written;
+ }
+ return written;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("file", file)
+ .add("position", position)
+ .add("count", count)
+ .toString();
+ }
+}
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 787a8f0031..621427d8cb 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -75,4 +75,21 @@ public class TransportConf {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTime() { return conf.getInt("spark.shuffle.io.retryWaitMs", 5000); }
+
+ /**
+ * Minimum size of a block that we should start using memory map rather than reading in through
+ * normal IO operations. This prevents Spark from memory mapping very small blocks. In general,
+ * memory mapping has high overhead for blocks close to or below the page size of the OS.
+ */
+ public int memoryMapBytes() {
+ return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024);
+ }
+
+ /**
+ * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are
+ * created only when data is going to be transferred. This can reduce the number of open files.
+ */
+ public boolean lazyFileDescriptor() {
+ return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+ }
}
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index c415883397..dfb7740344 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -63,6 +63,8 @@ public class ChunkFetchIntegrationSuite {
static ManagedBuffer bufferChunk;
static ManagedBuffer fileChunk;
+ private TransportConf transportConf;
+
@BeforeClass
public static void setUp() throws Exception {
int bufSize = 100000;
@@ -80,9 +82,10 @@ public class ChunkFetchIntegrationSuite {
new Random().nextBytes(fileContent);
fp.write(fileContent);
fp.close();
- fileChunk = new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
- TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+ fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
+
streamManager = new StreamManager() {
@Override
public ManagedBuffer getChunk(long streamId, int chunkIndex) {
@@ -90,7 +93,7 @@ public class ChunkFetchIntegrationSuite {
if (chunkIndex == BUFFER_CHUNK_INDEX) {
return new NioManagedBuffer(buf);
} else if (chunkIndex == FILE_CHUNK_INDEX) {
- return new FileSegmentManagedBuffer(testFile, 10, testFile.length() - 25);
+ return new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
} else {
throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index a6db4b2abd..46ca970862 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -21,6 +21,7 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +49,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
private final ExternalShuffleBlockManager blockManager;
private final OneForOneStreamManager streamManager;
- public ExternalShuffleBlockHandler() {
- this(new OneForOneStreamManager(), new ExternalShuffleBlockManager());
+ public ExternalShuffleBlockHandler(TransportConf conf) {
+ this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
}
/** Enables mocking out the StreamManager and BlockManager. */
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index ffb7faa3db..dfe0ba0595 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -37,6 +37,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.TransportConf;
/**
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
@@ -56,14 +57,17 @@ public class ExternalShuffleBlockManager {
// Single-threaded Java executor used to perform expensive recursive directory deletion.
private final Executor directoryCleaner;
- public ExternalShuffleBlockManager() {
+ private final TransportConf conf;
+
+ public ExternalShuffleBlockManager(TransportConf conf) {
// TODO: Give this thread a name.
- this(Executors.newSingleThreadExecutor());
+ this(conf, Executors.newSingleThreadExecutor());
}
// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
- ExternalShuffleBlockManager(Executor directoryCleaner) {
+ ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
+ this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
}
@@ -167,7 +171,7 @@ public class ExternalShuffleBlockManager {
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
- return new FileSegmentManagedBuffer(shuffleFile, 0, shuffleFile.length());
+ return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
}
/**
@@ -187,6 +191,7 @@ public class ExternalShuffleBlockManager {
long offset = in.readLong();
long nextOffset = in.readLong();
return new FileSegmentManagedBuffer(
+ conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
offset,
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
index da54797e89..dad6428a83 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import com.google.common.io.CharStreams;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -37,6 +39,8 @@ public class ExternalShuffleBlockManagerSuite {
static TestShuffleDataContext dataContext;
+ static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+
@BeforeClass
public static void beforeAll() throws IOException {
dataContext = new TestShuffleDataContext(2, 5);
@@ -56,7 +60,7 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testBadRequests() {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
// Unregistered executor
try {
manager.getBlockData("app0", "exec1", "shuffle_1_1_0");
@@ -87,7 +91,7 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testSortShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager"));
@@ -106,7 +110,7 @@ public class ExternalShuffleBlockManagerSuite {
@Test
public void testHashShuffleBlocks() throws IOException {
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager();
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf);
manager.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager"));
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index c8ece3bc53..254e3a7a32 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -25,20 +25,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Test;
-
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
public class ExternalShuffleCleanupSuite {
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+ TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
@Test
public void noCleanupAndCleanup() throws IOException {
TestShuffleDataContext dataContext = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", false /* cleanup */);
@@ -61,7 +64,7 @@ public class ExternalShuffleCleanupSuite {
@Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
};
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
manager.applicationRemoved("app", true);
@@ -78,7 +81,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
@@ -93,7 +96,7 @@ public class ExternalShuffleCleanupSuite {
TestShuffleDataContext dataContext0 = createSomeData();
TestShuffleDataContext dataContext1 = createSomeData();
- ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+ ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor);
manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 687bde59fd..02c10bcb7b 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -92,7 +92,7 @@ public class ExternalShuffleIntegrationSuite {
dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
conf = new TransportConf(new SystemPropertyConfigProvider());
- handler = new ExternalShuffleBlockHandler();
+ handler = new ExternalShuffleBlockHandler(conf);
TransportContext transportContext = new TransportContext(conf, handler);
server = transportContext.createServer();
}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index 8afceab1d5..759a12910c 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleSecuritySuite {
@Before
public void beforeEach() {
- RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(),
+ RpcHandler handler = new SaslRpcHandler(new ExternalShuffleBlockHandler(conf),
new TestSecretKeyHolder("my-app-id", "secret"));
TransportContext context = new TransportContext(conf, handler);
this.server = context.createServer();
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index bb0b8f7e6c..a34aabe9e7 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -95,10 +95,11 @@ public class YarnShuffleService extends AuxiliaryService {
*/
@Override
protected void serviceInit(Configuration conf) {
+ TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
- RpcHandler rpcHandler = new ExternalShuffleBlockHandler();
+ RpcHandler rpcHandler = new ExternalShuffleBlockHandler(transportConf);
if (authEnabled) {
secretManager = new ShuffleSecretManager();
rpcHandler = new SaslRpcHandler(rpcHandler, secretManager);
@@ -106,7 +107,6 @@ public class YarnShuffleService extends AuxiliaryService {
int port = conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
- TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
shuffleServer = transportContext.createServer(port);
String authEnabledString = authEnabled ? "enabled" : "not enabled";