aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-12 18:46:37 -0800
committerReynold Xin <rxin@databricks.com>2014-11-12 18:46:37 -0800
commitb9e1c2eb9b6f7fb609718ef20048a8da452d881b (patch)
treeae44b633e68d4fd021846859ad578b7c891fab5d
parent23f5bdf06a388e08ea5a69e848f0ecd5165aa481 (diff)
downloadspark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.gz
spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.bz2
spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.zip
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
Author: Aaron Davidson <aaron@databricks.com> Closes #3155 from aarondav/conf and squashes the following commits: 7045e77 [Aaron Davidson] Add mesos comment 4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala5
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java33
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java4
-rw-r--r--network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java39
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala2
17 files changed, 104 insertions, 64 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index e7454beddb..e464b32e61 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -168,9 +168,11 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
+ numCores: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
- create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
+ create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
+ numUsableCores = numCores)
}
/**
@@ -184,7 +186,8 @@ object SparkEnv extends Logging {
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null,
- defaultActorSystem: ActorSystem = null): SparkEnv = {
+ defaultActorSystem: ActorSystem = null,
+ numUsableCores: Int = 0): SparkEnv = {
// Listener bus is only used on the driver
if (isDriver) {
@@ -276,7 +279,7 @@ object SparkEnv extends Logging {
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
- new NettyBlockTransferService(conf, securityManager)
+ new NettyBlockTransferService(conf, securityManager, numUsableCores)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}
@@ -287,7 +290,8 @@ object SparkEnv extends Logging {
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
- serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
+ serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
+ numUsableCores)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
index d044e1d01d..b9798963ba 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala
@@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
- private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
+ private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 3711824a40..5f46f3b1f0 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
- // Make this host instead of hostPort ?
val (hostname, _) = Utils.parseHostPort(hostPort)
- executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
+ executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false,
+ actorSystem)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index caf4d76713..4c378a278b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -43,6 +43,7 @@ private[spark] class Executor(
executorId: String,
slaveHostname: String,
properties: Seq[(String, String)],
+ numCores: Int,
isLocal: Boolean = false,
actorSystem: ActorSystem = null)
extends Logging
@@ -83,7 +84,7 @@ private[spark] class Executor(
if (!isLocal) {
val port = conf.getInt("spark.executor.port", 0)
val _env = SparkEnv.createExecutorEnv(
- conf, executorId, slaveHostname, port, isLocal, actorSystem)
+ conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env.blockManager.initialize(conf.getAppId)
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index bca0b15226..f15e6bc33f 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
+import scala.collection.JavaConversions._
+
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
@@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
- logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
+
+ // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
+ val cpusPerTask = executorInfo.getResourcesList
+ .find(_.getName == "cpus")
+ .map(_.getScalar.getValue.toInt)
+ .getOrElse(0)
+ val executorId = executorInfo.getExecutorId.getValue
+
+ logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
- executorInfo.getExecutorId.getValue,
+ executorId,
slaveInfo.getHostname,
- properties)
+ properties,
+ cpusPerTask)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index f8a7f64068..0027cbb0ff 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -35,13 +35,13 @@ import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
*/
-class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
+class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
extends BlockTransferService {
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
- private val transportConf = SparkTransportConf.fromSparkConf(conf)
+ private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index 9fa4fa77b8..ce4225cae6 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -20,11 +20,22 @@ package org.apache.spark.network.netty
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
-/**
- * Utility for creating a [[TransportConf]] from a [[SparkConf]].
- */
object SparkTransportConf {
- def fromSparkConf(conf: SparkConf): TransportConf = {
+ /**
+ * Utility for creating a [[TransportConf]] from a [[SparkConf]].
+ * @param numUsableCores if nonzero, this will restrict the server and client threads to only
+ * use the given number of cores, rather than all of the machine's cores.
+ * This restriction will only occur if these properties are not already set.
+ */
+ def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
+ val conf = _conf.clone
+ if (numUsableCores > 0) {
+ // Only set if serverThreads/clientThreads not already set.
+ conf.set("spark.shuffle.io.serverThreads",
+ conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
+ conf.set("spark.shuffle.io.clientThreads",
+ conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
+ }
new TransportConf(new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index c0264836de..a2f1f14264 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -51,7 +51,7 @@ private[spark] class LocalActor(
private val localExecutorHostname = "localhost"
val executor = new Executor(
- localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
+ localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)
override def receiveWithLogging = {
case ReviveOffers =>
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 39434f473a..308c59eda5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -73,7 +73,8 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
- securityManager: SecurityManager)
+ securityManager: SecurityManager,
+ numUsableCores: Int)
extends BlockDataManager with Logging {
val diskBlockManager = new DiskBlockManager(this, conf)
@@ -121,8 +122,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
- new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
- securityManager.isAuthenticationEnabled())
+ val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
+ new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}
@@ -174,9 +175,10 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
- securityManager: SecurityManager) = {
+ securityManager: SecurityManager,
+ numUsableCores: Int) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
- conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
+ conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 9623d66517..55799f5514 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
var rpcHandler: ExternalShuffleBlockHandler = _
override def beforeAll() {
- val transportConf = SparkTransportConf.fromSparkConf(conf)
+ val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 530f5d6db5..94bfa67451 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -104,11 +104,11 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
val securityManager0 = new SecurityManager(conf0)
- val exec0 = new NettyBlockTransferService(conf0, securityManager0)
+ val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1)
exec0.init(blockManager)
val securityManager1 = new SecurityManager(conf1)
- val exec1 = new NettyBlockTransferService(conf1, securityManager1)
+ val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1)
exec1.init(blockManager)
val result = fetchBlock(exec0, exec1, "1", blockId) match {
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index f63e772bf1..c2903c8597 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr)
+ mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
store.initialize("app-id")
allStores += store
store
@@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
- 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
+ 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
failableStore.initialize("app-id")
allStores += failableStore // so that this gets stopped after test
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 9529502bc8..5554efbcba 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr)
+ mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")
manager
}
@@ -795,7 +795,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
- new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
+ new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr,
+ 0)
// The put should fail since a1 is not serializable.
class UnserializableClass
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 397d3a8455..76bce85928 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -118,7 +118,8 @@ public class TransportClientFactory implements Closeable {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
// Use pooled buffers to reduce temporary buffer allocation
- bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator());
+ bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
@@ -190,34 +191,4 @@ public class TransportClientFactory implements Closeable {
workerGroup = null;
}
}
-
- /**
- * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
- * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
- * executor thread rather than the event loop thread. Those thread-local caches actually delay
- * the recycling of buffers, leading to larger memory usage.
- */
- private PooledByteBufAllocator createPooledByteBufAllocator() {
- return new PooledByteBufAllocator(
- conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(),
- getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"),
- getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"),
- getPrivateStaticField("DEFAULT_PAGE_SIZE"),
- getPrivateStaticField("DEFAULT_MAX_ORDER"),
- 0, // tinyCacheSize
- 0, // smallCacheSize
- 0 // normalCacheSize
- );
- }
-
- /** Used to get defaults from Netty's private static fields. */
- private int getPrivateStaticField(String name) {
- try {
- Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
- f.setAccessible(true);
- return f.getInt(null);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
}
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 579676c2c3..625c3257d7 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -72,8 +72,8 @@ public class TransportServer implements Closeable {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
- PooledByteBufAllocator allocator = new PooledByteBufAllocator(
- conf.preferDirectBufs() && PlatformDependent.directBufferPreferred());
+ PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
+ conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 2a7664fe89..5c654a6fd6 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -17,9 +17,11 @@
package org.apache.spark.network.util;
+import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@@ -32,6 +34,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.internal.PlatformDependent;
/**
* Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
@@ -103,4 +106,40 @@ public class NettyUtils {
}
return "<unknown remote>";
}
+
+ /**
+ * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
+ * are disabled because the ByteBufs are allocated by the event loop thread, but released by the
+ * executor thread rather than the event loop thread. Those thread-local caches actually delay
+ * the recycling of buffers, leading to larger memory usage.
+ */
+ public static PooledByteBufAllocator createPooledByteBufAllocator(
+ boolean allowDirectBufs,
+ boolean allowCache,
+ int numCores) {
+ if (numCores == 0) {
+ numCores = Runtime.getRuntime().availableProcessors();
+ }
+ return new PooledByteBufAllocator(
+ allowDirectBufs && PlatformDependent.directBufferPreferred(),
+ Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
+ Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
+ getPrivateStaticField("DEFAULT_PAGE_SIZE"),
+ getPrivateStaticField("DEFAULT_MAX_ORDER"),
+ allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
+ allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
+ allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
+ );
+ }
+
+ /** Used to get defaults from Netty's private static fields. */
+ private static int getPrivateStaticField(String name) {
+ try {
+ Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.getInt(null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 9efe15d01e..3661e16a9e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
blockManagerSize, conf, mapOutputTracker, shuffleManager,
- new NioBlockTransferService(conf, securityMgr), securityMgr)
+ new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")
tempDirectory = Files.createTempDir()