diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-11-12 18:46:37 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-11-12 18:46:37 -0800 |
commit | b9e1c2eb9b6f7fb609718ef20048a8da452d881b (patch) | |
tree | ae44b633e68d4fd021846859ad578b7c891fab5d /network | |
parent | 23f5bdf06a388e08ea5a69e848f0ecd5165aa481 (diff) | |
download | spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.gz spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.bz2 spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.zip |
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
Author: Aaron Davidson <aaron@databricks.com>
Closes #3155 from aarondav/conf and squashes the following commits:
7045e77 [Aaron Davidson] Add mesos comment
4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size
Diffstat (limited to 'network')
3 files changed, 43 insertions, 33 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 397d3a8455..76bce85928 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -118,7 +118,8 @@ public class TransportClientFactory implements Closeable { .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()); // Use pooled buffers to reduce temporary buffer allocation - bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator()); + bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads())); final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>(); @@ -190,34 +191,4 @@ public class TransportClientFactory implements Closeable { workerGroup = null; } } - - /** - * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches - * are disabled because the ByteBufs are allocated by the event loop thread, but released by the - * executor thread rather than the event loop thread. Those thread-local caches actually delay - * the recycling of buffers, leading to larger memory usage. - */ - private PooledByteBufAllocator createPooledByteBufAllocator() { - return new PooledByteBufAllocator( - conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(), - getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), - getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), - getPrivateStaticField("DEFAULT_PAGE_SIZE"), - getPrivateStaticField("DEFAULT_MAX_ORDER"), - 0, // tinyCacheSize - 0, // smallCacheSize - 0 // normalCacheSize - ); - } - - /** Used to get defaults from Netty's private static fields. */ - private int getPrivateStaticField(String name) { - try { - Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); - f.setAccessible(true); - return f.getInt(null); - } catch (Exception e) { - throw new RuntimeException(e); - } - } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index 579676c2c3..625c3257d7 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -72,8 +72,8 @@ public class TransportServer implements Closeable { NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); EventLoopGroup workerGroup = bossGroup; - PooledByteBufAllocator allocator = new PooledByteBufAllocator( - conf.preferDirectBufs() && PlatformDependent.directBufferPreferred()); + PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( + conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java index 2a7664fe89..5c654a6fd6 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -17,9 +17,11 @@ package org.apache.spark.network.util; +import java.lang.reflect.Field; import java.util.concurrent.ThreadFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; @@ -32,6 +34,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.util.internal.PlatformDependent; /** * Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO. @@ -103,4 +106,40 @@ public class NettyUtils { } return "<unknown remote>"; } + + /** + * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches + * are disabled because the ByteBufs are allocated by the event loop thread, but released by the + * executor thread rather than the event loop thread. Those thread-local caches actually delay + * the recycling of buffers, leading to larger memory usage. + */ + public static PooledByteBufAllocator createPooledByteBufAllocator( + boolean allowDirectBufs, + boolean allowCache, + int numCores) { + if (numCores == 0) { + numCores = Runtime.getRuntime().availableProcessors(); + } + return new PooledByteBufAllocator( + allowDirectBufs && PlatformDependent.directBufferPreferred(), + Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores), + Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0), + getPrivateStaticField("DEFAULT_PAGE_SIZE"), + getPrivateStaticField("DEFAULT_MAX_ORDER"), + allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0, + allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0 + ); + } + + /** Used to get defaults from Netty's private static fields. */ + private static int getPrivateStaticField(String name) { + try { + Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.getInt(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } |