aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java12
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java6
2 files changed, 8 insertions, 10 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 76bce85928..9afd5decd5 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -19,7 +19,6 @@ package org.apache.spark.network.client;
import java.io.Closeable;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
@@ -37,7 +36,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +65,7 @@ public class TransportClientFactory implements Closeable {
private final Class<? extends Channel> socketChannelClass;
private EventLoopGroup workerGroup;
+ private PooledByteBufAllocator pooledAllocator;
public TransportClientFactory(
TransportContext context,
@@ -80,6 +79,8 @@ public class TransportClientFactory implements Closeable {
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
// TODO: Make thread pool name configurable.
this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
+ this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
}
/**
@@ -115,11 +116,8 @@ public class TransportClientFactory implements Closeable {
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
-
- // Use pooled buffers to reduce temporary buffer allocation
- bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
- conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
+ .option(ChannelOption.ALLOCATOR, pooledAllocator);
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 5c654a6fd6..b3991a6577 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -109,9 +109,9 @@ public class NettyUtils {
/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
- * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
- * executor thread rather than the event loop thread. Those thread-local caches actually delay
- * the recycling of buffers, leading to larger memory usage.
+ * are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
+ * but released by the executor thread rather than the event loop thread. Those thread-local
+ * caches actually delay the recycling of buffers, leading to larger memory usage.
*/
public static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs,