aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-09-21 14:42:41 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-21 14:42:41 -0700
commit2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42 (patch)
treef0734e443661a59da6b14d496aeacd16809ecb86 /core/src
parentb4a4421b610e776e5280fd5e7453f937f806cbd1 (diff)
downloadspark-2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42.tar.gz
spark-2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42.tar.bz2
spark-2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42.zip
[SPARK-4563][CORE] Allow driver to advertise a different network address.
The goal of this feature is to allow the Spark driver to run in an isolated environment, such as a docker container, and be able to use the host's port forwarding mechanism to be able to accept connections from the outside world. The change is restricted to the driver: there is no support for achieving the same thing on executors (or the YARN AM for that matter). Those still need full access to the outside world so that, for example, connections can be made to an executor's block manager. The core of the change is simple: add a new configuration that tells what's the address the driver should bind to, which can be different than the address it advertises to executors (spark.driver.host). Everything else is plumbing the new configuration where it's needed. To use the feature, the host starting the container needs to set up the driver's port range to fall into a range that is being forwarded; this required the block manager port to need a special configuration just for the driver, which falls back to the existing spark.blockManager.port when not set. This way, users can modify the driver settings without affecting the executors; it would theoretically be nice to also have different retry counts for driver and executors, but given that docker (at least) allows forwarding port ranges, we can probably live without that for now. Because of the nature of the feature it's kinda hard to add unit tests; I just added a simple one to make sure the configuration works. This was tested with a docker image running spark-shell with the following command: docker blah blah blah \ -p 38000-38100:38000-38100 \ [image] \ spark-shell \ --num-executors 3 \ --conf spark.shuffle.service.enabled=false \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.driver.host=[host's address] \ --conf spark.driver.port=38000 \ --conf spark.driver.blockManager.port=38020 \ --conf spark.ui.port=38040 Running on YARN; verified the driver works, executors start up and listen on ephemeral ports (instead of using the driver's config), and that caching and shuffling (without the shuffle service) works. Clicked through the UI to make sure all pages (including executor thread dumps) worked. Also tested apps without docker, and ran unit tests. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15120 from vanzin/SPARK-4563.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala4
15 files changed, 100 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e85e5aa237..51a699f41d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -422,6 +422,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
configsWithAlternatives.get(key).toSeq.flatten.exists { alt => contains(alt.key) }
}
+ private[spark] def contains(entry: ConfigEntry[_]): Boolean = contains(entry.key)
+
/** Copy this object */
override def clone: SparkConf = {
val cloned = new SparkConf(false)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 35b6334832..db84172e16 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -383,8 +383,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
- // Set Spark driver host and port system properties
- _conf.setIfMissing("spark.driver.host", Utils.localHostName())
+ // Set Spark driver host and port system properties. This explicitly sets the configuration
+ // instead of relying on the default value of the config constant.
+ _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index cc8e3fdc97..1ffeb12988 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -29,6 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -158,14 +159,17 @@ object SparkEnv extends Logging {
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
- assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
+ assert(conf.contains(DRIVER_HOST_ADDRESS),
+ s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
- val hostname = conf.get("spark.driver.host")
+ val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
+ val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
- hostname,
+ bindAddress,
+ advertiseAddress,
port,
isDriver = true,
isLocal = isLocal,
@@ -190,6 +194,7 @@ object SparkEnv extends Logging {
conf,
executorId,
hostname,
+ hostname,
port,
isDriver = false,
isLocal = isLocal,
@@ -205,7 +210,8 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
- hostname: String,
+ bindAddress: String,
+ advertiseAddress: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
@@ -221,8 +227,8 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)
val systemName = if (isDriver) driverSystemName else executorSystemName
- val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
- clientMode = !isDriver)
+ val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
+ securityManager, clientMode = !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
@@ -309,8 +315,15 @@ object SparkEnv extends Logging {
UnifiedMemoryManager(conf, numUsableCores)
}
+ val blockManagerPort = if (isDriver) {
+ conf.get(DRIVER_BLOCK_MANAGER_PORT)
+ } else {
+ conf.get(BLOCK_MANAGER_PORT)
+ }
+
val blockTransferService =
- new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
+ new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
+ blockManagerPort, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
index 4b546c847a..97f56a64d6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
@@ -66,7 +66,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
- case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
+ case e: FallbackConfigEntry[_] => get(e.fallback.key)
case _ => None
}
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 02d7d182a4..d536cc5097 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -19,6 +19,7 @@ package org.apache.spark.internal
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.util.Utils
package object config {
@@ -143,4 +144,23 @@ package object config {
.internal()
.stringConf
.createWithDefaultString("AES/CTR/NoPadding")
+
+ private[spark] val DRIVER_HOST_ADDRESS = ConfigBuilder("spark.driver.host")
+ .doc("Address of driver endpoints.")
+ .stringConf
+ .createWithDefault(Utils.localHostName())
+
+ private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
+ .doc("Address where to bind network listen sockets on the driver.")
+ .fallbackConf(DRIVER_HOST_ADDRESS)
+
+ private[spark] val BLOCK_MANAGER_PORT = ConfigBuilder("spark.blockManager.port")
+ .doc("Port to use for the block manager when a more specific setting is not provided.")
+ .intConf
+ .createWithDefault(0)
+
+ private[spark] val DRIVER_BLOCK_MANAGER_PORT = ConfigBuilder("spark.driver.blockManager.port")
+ .doc("Port to use for the block managed on the driver.")
+ .fallbackConf(BLOCK_MANAGER_PORT)
+
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 33a3219607..dc70eb82d2 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -42,7 +42,9 @@ import org.apache.spark.util.Utils
private[spark] class NettyBlockTransferService(
conf: SparkConf,
securityManager: SecurityManager,
+ bindAddress: String,
override val hostName: String,
+ _port: Int,
numCores: Int)
extends BlockTransferService {
@@ -75,12 +77,11 @@ private[spark] class NettyBlockTransferService(
/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
- val server = transportContext.createServer(hostName, port, bootstraps.asJava)
+ val server = transportContext.createServer(bindAddress, port, bootstraps.asJava)
(server, server.getPort)
}
- val portToTry = conf.getInt("spark.blockManager.port", 0)
- Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
+ Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
}
override def fetchBlocks(
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 5668377133..579122868a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -40,7 +40,19 @@ private[spark] object RpcEnv {
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
- val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
+ create(name, host, host, port, conf, securityManager, clientMode)
+ }
+
+ def create(
+ name: String,
+ bindAddress: String,
+ advertiseAddress: String,
+ port: Int,
+ conf: SparkConf,
+ securityManager: SecurityManager,
+ clientMode: Boolean): RpcEnv = {
+ val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
+ clientMode)
new NettyRpcEnvFactory().create(config)
}
}
@@ -186,7 +198,8 @@ private[spark] trait RpcEnvFileServer {
private[spark] case class RpcEnvConfig(
conf: SparkConf,
name: String,
- host: String,
+ bindAddress: String,
+ advertiseAddress: String,
port: Int,
securityManager: SecurityManager,
clientMode: Boolean)
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 89d2fb9b47..e51649a1ec 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -108,14 +108,14 @@ private[netty] class NettyRpcEnv(
}
}
- def startServer(port: Int): Unit = {
+ def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
- server = transportContext.createServer(host, port, bootstraps)
+ server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
@@ -441,10 +441,11 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
- new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
+ new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
+ config.securityManager)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
- nettyEnv.startServer(actualPort)
+ nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 38363800ec..4118fcf46b 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -28,6 +28,7 @@ import org.json4s.JsonAST.{JNothing, JValue}
import org.apache.spark.{SecurityManager, SparkConf, SSLOptions}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -50,8 +51,8 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
- protected val localHostName = Utils.localHostNameForURI()
- protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+ protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
+ conf.get(DRIVER_HOST_ADDRESS))
private val className = Utils.getFormattedClassName(this)
def getBasePath: String = basePath
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9b4274a27b..09896c4e2f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2079,9 +2079,9 @@ private[spark] object Utils extends Logging {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
- s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
- s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
- "port or increasing spark.port.maxRetries."
+ s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " +
+ s"the appropriate port for the service$serviceString (for example spark.ui.port " +
+ s"for SparkUI) to an available port or increasing spark.port.maxRetries."
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index ed15e77ff1..022fe91eda 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -108,11 +108,13 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
val securityManager0 = new SecurityManager(conf0)
- val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", numCores = 1)
+ val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", "localhost", 0,
+ 1)
exec0.init(blockManager)
val securityManager1 = new SecurityManager(conf1)
- val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", numCores = 1)
+ val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", "localhost", 0,
+ 1)
exec1.init(blockManager)
val result = fetchBlock(exec0, exec1, "1", blockId) match {
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index e7df7cb419..121447a965 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -23,6 +23,7 @@ import org.mockito.Mockito.mock
import org.scalatest._
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
import org.apache.spark.network.BlockDataManager
class NettyBlockTransferServiceSuite
@@ -86,10 +87,10 @@ class NettyBlockTransferServiceSuite
private def createService(port: Int): NettyBlockTransferService = {
val conf = new SparkConf()
.set("spark.app.id", s"test-${getClass.getName}")
- .set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
- val service = new NettyBlockTransferService(conf, securityManager, "localhost", numCores = 1)
+ val service = new NettyBlockTransferService(conf, securityManager, "localhost", "localhost",
+ port, 1)
service.init(blockDataManager)
service
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index 2d6543d328..0409aa3a5d 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -27,8 +27,8 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
name: String,
port: Int,
clientMode: Boolean = false): RpcEnv = {
- val config = RpcEnvConfig(conf, "test", "localhost", port, new SecurityManager(conf),
- clientMode)
+ val config = RpcEnvConfig(conf, "test", "localhost", "localhost", port,
+ new SecurityManager(conf), clientMode)
new NettyRpcEnvFactory().create(config)
}
@@ -41,4 +41,16 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
assert(e.getCause.getMessage.contains(uri))
}
+ test("advertise address different from bind address") {
+ val sparkConf = new SparkConf()
+ val config = RpcEnvConfig(sparkConf, "test", "localhost", "example.com", 0,
+ new SecurityManager(sparkConf), false)
+ val env = new NettyRpcEnvFactory().create(config)
+ try {
+ assert(env.address.hostPort.startsWith("example.com:"))
+ } finally {
+ env.shutdown()
+ }
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b9e3a364ee..e1c1787cbd 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -67,7 +67,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
conf.set("spark.memory.offHeap.size", maxMem.toString)
- val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6d53d2e5f0..1652fcdb96 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -80,7 +80,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.memory.offHeap.size", maxMem.toString)
val serializer = new KryoSerializer(conf)
val transfer = transferService
- .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1))
+ .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1))
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
@@ -854,7 +854,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
conf.set("spark.testing.memory", "1200")
- val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,