aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala4
-rw-r--r--docs/configuration.md23
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala3
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala5
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala2
21 files changed, 133 insertions, 40 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e85e5aa237..51a699f41d 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -422,6 +422,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
configsWithAlternatives.get(key).toSeq.flatten.exists { alt => contains(alt.key) }
}
+ private[spark] def contains(entry: ConfigEntry[_]): Boolean = contains(entry.key)
+
/** Copy this object */
override def clone: SparkConf = {
val cloned = new SparkConf(false)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 35b6334832..db84172e16 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -383,8 +383,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
- // Set Spark driver host and port system properties
- _conf.setIfMissing("spark.driver.host", Utils.localHostName())
+ // Set Spark driver host and port system properties. This explicitly sets the configuration
+ // instead of relying on the default value of the config constant.
+ _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index cc8e3fdc97..1ffeb12988 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -29,6 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -158,14 +159,17 @@ object SparkEnv extends Logging {
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
- assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
+ assert(conf.contains(DRIVER_HOST_ADDRESS),
+ s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
- val hostname = conf.get("spark.driver.host")
+ val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
+ val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
- hostname,
+ bindAddress,
+ advertiseAddress,
port,
isDriver = true,
isLocal = isLocal,
@@ -190,6 +194,7 @@ object SparkEnv extends Logging {
conf,
executorId,
hostname,
+ hostname,
port,
isDriver = false,
isLocal = isLocal,
@@ -205,7 +210,8 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
- hostname: String,
+ bindAddress: String,
+ advertiseAddress: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
@@ -221,8 +227,8 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)
val systemName = if (isDriver) driverSystemName else executorSystemName
- val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
- clientMode = !isDriver)
+ val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
+ securityManager, clientMode = !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
@@ -309,8 +315,15 @@ object SparkEnv extends Logging {
UnifiedMemoryManager(conf, numUsableCores)
}
+ val blockManagerPort = if (isDriver) {
+ conf.get(DRIVER_BLOCK_MANAGER_PORT)
+ } else {
+ conf.get(BLOCK_MANAGER_PORT)
+ }
+
val blockTransferService =
- new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
+ new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
+ blockManagerPort, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
index 4b546c847a..97f56a64d6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
@@ -66,7 +66,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
- case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
+ case e: FallbackConfigEntry[_] => get(e.fallback.key)
case _ => None
}
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 02d7d182a4..d536cc5097 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -19,6 +19,7 @@ package org.apache.spark.internal
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.util.Utils
package object config {
@@ -143,4 +144,23 @@ package object config {
.internal()
.stringConf
.createWithDefaultString("AES/CTR/NoPadding")
+
+ private[spark] val DRIVER_HOST_ADDRESS = ConfigBuilder("spark.driver.host")
+ .doc("Address of driver endpoints.")
+ .stringConf
+ .createWithDefault(Utils.localHostName())
+
+ private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
+ .doc("Address where to bind network listen sockets on the driver.")
+ .fallbackConf(DRIVER_HOST_ADDRESS)
+
+ private[spark] val BLOCK_MANAGER_PORT = ConfigBuilder("spark.blockManager.port")
+ .doc("Port to use for the block manager when a more specific setting is not provided.")
+ .intConf
+ .createWithDefault(0)
+
+ private[spark] val DRIVER_BLOCK_MANAGER_PORT = ConfigBuilder("spark.driver.blockManager.port")
+ .doc("Port to use for the block managed on the driver.")
+ .fallbackConf(BLOCK_MANAGER_PORT)
+
}
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 33a3219607..dc70eb82d2 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -42,7 +42,9 @@ import org.apache.spark.util.Utils
private[spark] class NettyBlockTransferService(
conf: SparkConf,
securityManager: SecurityManager,
+ bindAddress: String,
override val hostName: String,
+ _port: Int,
numCores: Int)
extends BlockTransferService {
@@ -75,12 +77,11 @@ private[spark] class NettyBlockTransferService(
/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
- val server = transportContext.createServer(hostName, port, bootstraps.asJava)
+ val server = transportContext.createServer(bindAddress, port, bootstraps.asJava)
(server, server.getPort)
}
- val portToTry = conf.getInt("spark.blockManager.port", 0)
- Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
+ Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
}
override def fetchBlocks(
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 5668377133..579122868a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -40,7 +40,19 @@ private[spark] object RpcEnv {
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
- val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
+ create(name, host, host, port, conf, securityManager, clientMode)
+ }
+
+ def create(
+ name: String,
+ bindAddress: String,
+ advertiseAddress: String,
+ port: Int,
+ conf: SparkConf,
+ securityManager: SecurityManager,
+ clientMode: Boolean): RpcEnv = {
+ val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
+ clientMode)
new NettyRpcEnvFactory().create(config)
}
}
@@ -186,7 +198,8 @@ private[spark] trait RpcEnvFileServer {
private[spark] case class RpcEnvConfig(
conf: SparkConf,
name: String,
- host: String,
+ bindAddress: String,
+ advertiseAddress: String,
port: Int,
securityManager: SecurityManager,
clientMode: Boolean)
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 89d2fb9b47..e51649a1ec 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -108,14 +108,14 @@ private[netty] class NettyRpcEnv(
}
}
- def startServer(port: Int): Unit = {
+ def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
- server = transportContext.createServer(host, port, bootstraps)
+ server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
@@ -441,10 +441,11 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
- new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
+ new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
+ config.securityManager)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
- nettyEnv.startServer(actualPort)
+ nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 38363800ec..4118fcf46b 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -28,6 +28,7 @@ import org.json4s.JsonAST.{JNothing, JValue}
import org.apache.spark.{SecurityManager, SparkConf, SSLOptions}
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -50,8 +51,8 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
- protected val localHostName = Utils.localHostNameForURI()
- protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+ protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
+ conf.get(DRIVER_HOST_ADDRESS))
private val className = Utils.getFormattedClassName(this)
def getBasePath: String = basePath
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9b4274a27b..09896c4e2f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2079,9 +2079,9 @@ private[spark] object Utils extends Logging {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
- s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
- s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
- "port or increasing spark.port.maxRetries."
+ s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " +
+ s"the appropriate port for the service$serviceString (for example spark.ui.port " +
+ s"for SparkUI) to an available port or increasing spark.port.maxRetries."
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index ed15e77ff1..022fe91eda 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -108,11 +108,13 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
val securityManager0 = new SecurityManager(conf0)
- val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", numCores = 1)
+ val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", "localhost", 0,
+ 1)
exec0.init(blockManager)
val securityManager1 = new SecurityManager(conf1)
- val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", numCores = 1)
+ val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", "localhost", 0,
+ 1)
exec1.init(blockManager)
val result = fetchBlock(exec0, exec1, "1", blockId) match {
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index e7df7cb419..121447a965 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -23,6 +23,7 @@ import org.mockito.Mockito.mock
import org.scalatest._
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
import org.apache.spark.network.BlockDataManager
class NettyBlockTransferServiceSuite
@@ -86,10 +87,10 @@ class NettyBlockTransferServiceSuite
private def createService(port: Int): NettyBlockTransferService = {
val conf = new SparkConf()
.set("spark.app.id", s"test-${getClass.getName}")
- .set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
- val service = new NettyBlockTransferService(conf, securityManager, "localhost", numCores = 1)
+ val service = new NettyBlockTransferService(conf, securityManager, "localhost", "localhost",
+ port, 1)
service.init(blockDataManager)
service
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index 2d6543d328..0409aa3a5d 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -27,8 +27,8 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
name: String,
port: Int,
clientMode: Boolean = false): RpcEnv = {
- val config = RpcEnvConfig(conf, "test", "localhost", port, new SecurityManager(conf),
- clientMode)
+ val config = RpcEnvConfig(conf, "test", "localhost", "localhost", port,
+ new SecurityManager(conf), clientMode)
new NettyRpcEnvFactory().create(config)
}
@@ -41,4 +41,16 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
assert(e.getCause.getMessage.contains(uri))
}
+ test("advertise address different from bind address") {
+ val sparkConf = new SparkConf()
+ val config = RpcEnvConfig(sparkConf, "test", "localhost", "example.com", 0,
+ new SecurityManager(sparkConf), false)
+ val env = new NettyRpcEnvFactory().create(config)
+ try {
+ assert(env.address.hostPort.startsWith("example.com:"))
+ } finally {
+ env.shutdown()
+ }
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b9e3a364ee..e1c1787cbd 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -67,7 +67,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
conf.set("spark.memory.offHeap.size", maxMem.toString)
- val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6d53d2e5f0..1652fcdb96 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -80,7 +80,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.memory.offHeap.size", maxMem.toString)
val serializer = new KryoSerializer(conf)
val transfer = transferService
- .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1))
+ .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1))
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
@@ -854,7 +854,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
conf.set("spark.testing.memory", "1200")
- val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
diff --git a/docs/configuration.md b/docs/configuration.md
index b50565367a..82ce232b33 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1069,10 +1069,31 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.driver.blockManager.port</code></td>
+ <td>(value of spark.blockManager.port)</td>
+ <td>
+ Driver-specific port for the block manager to listen on, for cases where it cannot use the same
+ configuration as executors.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.driver.bindAddress</code></td>
+ <td>(value of spark.driver.host)</td>
+ <td>
+ <p>Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP
+ environment variable (see below).</p>
+
+ <p>It also allows a different address from the local one to be advertised to executors or external systems.
+ This is useful, for example, when running containers with bridged networking. For this to properly work,
+ the different ports used by the driver (RPC, block manager and UI) need to be forwarded from the
+ container's host.</p>
+ </td>
+</tr>
+<tr>
<td><code>spark.driver.host</code></td>
<td>(local hostname)</td>
<td>
- Hostname or IP address for the driver to listen on.
+ Hostname or IP address for the driver.
This is used for communicating with the executors and the standalone Master.
</td>
</tr>
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index e19d445137..2963d161d6 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -32,6 +32,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.TaskState
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
@@ -424,7 +425,7 @@ trait MesosSchedulerUtils extends Logging {
}
}
- val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
+ val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
/**
* The values of the non-zero ports to be used by the executor process.
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index bbc79dd1ed..c3ab488e2a 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -221,7 +222,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("Port offer decline when there is no appropriate range") {
- setBackend(Map("spark.blockManager.port" -> "30100"))
+ setBackend(Map(BLOCK_MANAGER_PORT.key -> "30100"))
val offeredPorts = (31100L, 31200L)
val (mem, cpu) = (backend.executorMemory(sc), 4)
@@ -242,7 +243,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
test("Port offer accepted with user defined port numbers") {
val port = 30100
- setBackend(Map("spark.blockManager.port" -> s"$port"))
+ setBackend(Map(BLOCK_MANAGER_PORT.key -> s"$port"))
val offeredPorts = (30000L, 31000L)
val (mem, cpu) = (backend.executorMemory(sc), 4)
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index e3d794931a..ec47ab1531 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest._
import org.scalatest.mock.MockitoSugar
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
@@ -179,7 +180,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Port reservation is done correctly with user specified ports only") {
val conf = new SparkConf()
conf.set("spark.executor.port", "3000" )
- conf.set("spark.blockManager.port", "4000")
+ conf.set(BLOCK_MANAGER_PORT, 4000)
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
val (resourcesLeft, resourcesToBeUsed) = utils
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index bd8f9950bf..b79cc65d8b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler._
@@ -406,7 +407,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
// explicitly.
ssc = new StreamingContext(null, newCp, null)
val restoredConf1 = ssc.conf
- assert(restoredConf1.get("spark.driver.host") === "localhost")
+ val defaultConf = new SparkConf()
+ assert(restoredConf1.get("spark.driver.host") === defaultConf.get(DRIVER_HOST_ADDRESS))
assert(restoredConf1.get("spark.driver.port") !== "9999")
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 7e665454a5..f224193600 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -272,7 +272,7 @@ class ReceivedBlockHandlerSuite
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)