aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-12 18:46:37 -0800
committerReynold Xin <rxin@databricks.com>2014-11-12 18:46:37 -0800
commitb9e1c2eb9b6f7fb609718ef20048a8da452d881b (patch)
treeae44b633e68d4fd021846859ad578b7c891fab5d /network
parent23f5bdf06a388e08ea5a69e848f0ecd5165aa481 (diff)
downloadspark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.gz
spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.bz2
spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.zip
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
Author: Aaron Davidson <aaron@databricks.com> Closes #3155 from aarondav/conf and squashes the following commits: 7045e77 [Aaron Davidson] Add mesos comment 4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java33
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java4
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java39
3 files changed, 43 insertions, 33 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 397d3a8455..76bce85928 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -118,7 +118,8 @@ public class TransportClientFactory implements Closeable {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
// Use pooled buffers to reduce temporary buffer allocation
- bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator());
+ bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
@@ -190,34 +191,4 @@ public class TransportClientFactory implements Closeable {
workerGroup = null;
}
}
-
- /**
- * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
- * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
- * executor thread rather than the event loop thread. Those thread-local caches actually delay
- * the recycling of buffers, leading to larger memory usage.
- */
- private PooledByteBufAllocator createPooledByteBufAllocator() {
- return new PooledByteBufAllocator(
- conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(),
- getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"),
- getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"),
- getPrivateStaticField("DEFAULT_PAGE_SIZE"),
- getPrivateStaticField("DEFAULT_MAX_ORDER"),
- 0, // tinyCacheSize
- 0, // smallCacheSize
- 0 // normalCacheSize
- );
- }
-
- /** Used to get defaults from Netty's private static fields. */
- private int getPrivateStaticField(String name) {
- try {
- Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
- f.setAccessible(true);
- return f.getInt(null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 579676c2c3..625c3257d7 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -72,8 +72,8 @@ public class TransportServer implements Closeable {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
- PooledByteBufAllocator allocator = new PooledByteBufAllocator(
- conf.preferDirectBufs() && PlatformDependent.directBufferPreferred());
+ PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 2a7664fe89..5c654a6fd6 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -17,9 +17,11 @@
package org.apache.spark.network.util;
+import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@@ -32,6 +34,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.internal.PlatformDependent;
/**
* Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
@@ -103,4 +106,40 @@ public class NettyUtils {
}
return "<unknown remote>";
}
+
+ /**
+ * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
+ * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
+ * executor thread rather than the event loop thread. Those thread-local caches actually delay
+ * the recycling of buffers, leading to larger memory usage.
+ */
+ public static PooledByteBufAllocator createPooledByteBufAllocator(
+ boolean allowDirectBufs,
+ boolean allowCache,
+ int numCores) {
+ if (numCores == 0) {
+ numCores = Runtime.getRuntime().availableProcessors();
+ }
+ return new PooledByteBufAllocator(
+ allowDirectBufs && PlatformDependent.directBufferPreferred(),
+ Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
+ Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
+ getPrivateStaticField("DEFAULT_PAGE_SIZE"),
+ getPrivateStaticField("DEFAULT_MAX_ORDER"),
+ allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
+ allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
+ allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
+ );
+ }
+
+ /** Used to get defaults from Netty's private static fields. */
+ private static int getPrivateStaticField(String name) {
+ try {
+ Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.getInt(null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}