aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkEnv.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkEnv.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala27
1 files changed, 20 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index cc8e3fdc97..1ffeb12988 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -29,6 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -158,14 +159,17 @@ object SparkEnv extends Logging {
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
- assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
+ assert(conf.contains(DRIVER_HOST_ADDRESS),
+ s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
- val hostname = conf.get("spark.driver.host")
+ val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
+ val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
- hostname,
+ bindAddress,
+ advertiseAddress,
port,
isDriver = true,
isLocal = isLocal,
@@ -190,6 +194,7 @@ object SparkEnv extends Logging {
conf,
executorId,
hostname,
+ hostname,
port,
isDriver = false,
isLocal = isLocal,
@@ -205,7 +210,8 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
- hostname: String,
+ bindAddress: String,
+ advertiseAddress: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
@@ -221,8 +227,8 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)
val systemName = if (isDriver) driverSystemName else executorSystemName
- val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
- clientMode = !isDriver)
+ val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
+ securityManager, clientMode = !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
@@ -309,8 +315,15 @@ object SparkEnv extends Logging {
UnifiedMemoryManager(conf, numUsableCores)
}
+ val blockManagerPort = if (isDriver) {
+ conf.get(DRIVER_BLOCK_MANAGER_PORT)
+ } else {
+ conf.get(BLOCK_MANAGER_PORT)
+ }
+
val blockTransferService =
- new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
+ new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
+ blockManagerPort, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,