diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-11-12 18:46:37 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-11-12 18:46:37 -0800 |
commit | b9e1c2eb9b6f7fb609718ef20048a8da452d881b (patch) | |
tree | ae44b633e68d4fd021846859ad578b7c891fab5d /core | |
parent | 23f5bdf06a388e08ea5a69e848f0ecd5165aa481 (diff) | |
download | spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.gz spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.tar.bz2 spark-b9e1c2eb9b6f7fb609718ef20048a8da452d881b.zip |
[SPARK-4370] [Core] Limit number of Netty cores based on executor size
Author: Aaron Davidson <aaron@databricks.com>
Closes #3155 from aarondav/conf and squashes the following commits:
7045e77 [Aaron Davidson] Add mesos comment
4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size
Diffstat (limited to 'core')
13 files changed, 60 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index e7454beddb..e464b32e61 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -168,9 +168,11 @@ object SparkEnv extends Logging { executorId: String, hostname: String, port: Int, + numCores: Int, isLocal: Boolean, actorSystem: ActorSystem = null): SparkEnv = { - create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem) + create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem, + numUsableCores = numCores) } /** @@ -184,7 +186,8 @@ object SparkEnv extends Logging { isDriver: Boolean, isLocal: Boolean, listenerBus: LiveListenerBus = null, - defaultActorSystem: ActorSystem = null): SparkEnv = { + defaultActorSystem: ActorSystem = null, + numUsableCores: Int = 0): SparkEnv = { // Listener bus is only used on the driver if (isDriver) { @@ -276,7 +279,7 @@ object SparkEnv extends Logging { val blockTransferService = conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match { case "netty" => - new NettyBlockTransferService(conf, securityManager) + new NettyBlockTransferService(conf, securityManager, numUsableCores) case "nio" => new NioBlockTransferService(conf, securityManager) } @@ -287,7 +290,8 @@ object SparkEnv extends Logging { // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, - serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager) + serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, + numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index d044e1d01d..b9798963ba 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) private val useSasl: Boolean = securityManager.isAuthenticationEnabled() - private val transportConf = SparkTransportConf.fromSparkConf(sparkConf) + private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0) private val blockHandler = new ExternalShuffleBlockHandler(transportConf) private val transportContext: TransportContext = { val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3711824a40..5f46f3b1f0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend( override def receiveWithLogging = { case RegisteredExecutor => logInfo("Successfully registered with driver") - // Make this host instead of hostPort ? val (hostname, _) = Utils.parseHostPort(hostPort) - executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem) + executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false, + actorSystem) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index caf4d76713..4c378a278b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -43,6 +43,7 @@ private[spark] class Executor( executorId: String, slaveHostname: String, properties: Seq[(String, String)], + numCores: Int, isLocal: Boolean = false, actorSystem: ActorSystem = null) extends Logging @@ -83,7 +84,7 @@ private[spark] class Executor( if (!isLocal) { val port = conf.getInt("spark.executor.port", 0) val _env = SparkEnv.createExecutorEnv( - conf, executorId, slaveHostname, port, isLocal, actorSystem) + conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) _env.blockManager.initialize(conf.getAppId) diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index bca0b15226..f15e6bc33f 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -19,6 +19,8 @@ package org.apache.spark.executor import java.nio.ByteBuffer +import scala.collection.JavaConversions._ + import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} @@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo) { - logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) + + // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. + val cpusPerTask = executorInfo.getResourcesList + .find(_.getName == "cpus") + .map(_.getScalar.getValue.toInt) + .getOrElse(0) + val executorId = executorInfo.getExecutorId.getValue + + logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus") this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) executor = new Executor( - executorInfo.getExecutorId.getValue, + executorId, slaveInfo.getHostname, - properties) + properties, + cpusPerTask) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index f8a7f64068..0027cbb0ff 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -35,13 +35,13 @@ import org.apache.spark.util.Utils /** * A BlockTransferService that uses Netty to fetch a set of blocks at at time. */ -class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager) +class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int) extends BlockTransferService { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. private val serializer = new JavaSerializer(conf) private val authEnabled = securityManager.isAuthenticationEnabled() - private val transportConf = SparkTransportConf.fromSparkConf(conf) + private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores) private[this] var transportContext: TransportContext = _ private[this] var server: TransportServer = _ diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala index 9fa4fa77b8..ce4225cae6 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala @@ -20,11 +20,22 @@ package org.apache.spark.network.netty import org.apache.spark.SparkConf import org.apache.spark.network.util.{TransportConf, ConfigProvider} -/** - * Utility for creating a [[TransportConf]] from a [[SparkConf]]. - */ object SparkTransportConf { - def fromSparkConf(conf: SparkConf): TransportConf = { + /** + * Utility for creating a [[TransportConf]] from a [[SparkConf]]. + * @param numUsableCores if nonzero, this will restrict the server and client threads to only + * use the given number of cores, rather than all of the machine's cores. + * This restriction will only occur if these properties are not already set. + */ + def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = { + val conf = _conf.clone + if (numUsableCores > 0) { + // Only set if serverThreads/clientThreads not already set. + conf.set("spark.shuffle.io.serverThreads", + conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString)) + conf.set("spark.shuffle.io.clientThreads", + conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString)) + } new TransportConf(new ConfigProvider { override def get(name: String): String = conf.get(name) }) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index c0264836de..a2f1f14264 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -51,7 +51,7 @@ private[spark] class LocalActor( private val localExecutorHostname = "localhost" val executor = new Executor( - localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true) + localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true) override def receiveWithLogging = { case ReviveOffers => diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 39434f473a..308c59eda5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -73,7 +73,8 @@ private[spark] class BlockManager( mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, blockTransferService: BlockTransferService, - securityManager: SecurityManager) + securityManager: SecurityManager, + numUsableCores: Int) extends BlockDataManager with Logging { val diskBlockManager = new DiskBlockManager(this, conf) @@ -121,8 +122,8 @@ private[spark] class BlockManager( // Client to read other executors' shuffle files. This is either an external service, or just the // standard BlockTranserService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { - new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager, - securityManager.isAuthenticationEnabled()) + val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores) + new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) } else { blockTransferService } @@ -174,9 +175,10 @@ private[spark] class BlockManager( mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, blockTransferService: BlockTransferService, - securityManager: SecurityManager) = { + securityManager: SecurityManager, + numUsableCores: Int) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), - conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager) + conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) } /** diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 9623d66517..55799f5514 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { var rpcHandler: ExternalShuffleBlockHandler = _ override def beforeAll() { - val transportConf = SparkTransportConf.fromSparkConf(conf) + val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2) rpcHandler = new ExternalShuffleBlockHandler(transportConf) val transportContext = new TransportContext(transportConf, rpcHandler) server = transportContext.createServer() diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 530f5d6db5..94bfa67451 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -104,11 +104,11 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer) val securityManager0 = new SecurityManager(conf0) - val exec0 = new NettyBlockTransferService(conf0, securityManager0) + val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1) exec0.init(blockManager) val securityManager1 = new SecurityManager(conf1) - val exec1 = new NettyBlockTransferService(conf1, securityManager1) + val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1) exec1.init(blockManager) val result = fetchBlock(exec0, exec1, "1", blockId) match { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index f63e772bf1..c2903c8597 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr) + mapOutputTracker, shuffleManager, transfer, securityMgr, 0) store.initialize("app-id") allStores += store store @@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) val failableStore = new BlockManager("failable-store", actorSystem, master, serializer, - 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr) + 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) failableStore.initialize("app-id") allStores += failableStore // so that this gets stopped after test assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId)) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9529502bc8..5554efbcba 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr) + mapOutputTracker, shuffleManager, transfer, securityMgr, 0) manager.initialize("app-id") manager } @@ -795,7 +795,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter // Use Java serializer so we can create an unserializable error. val transfer = new NioBlockTransferService(conf, securityMgr) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master, - new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr) + new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr, + 0) // The put should fail since a1 is not serializable. class UnserializableClass |