diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-04-08 17:18:19 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-04-08 17:18:19 -0700 |
commit | 4d7c35926371f9e016577987c037abcf984443d9 (patch) | |
tree | 7824fdcaa5d50d2a336e9830d1786945b5ea2529 /core/src/main/scala | |
parent | 906eef4c7a380419f2d089262afdcf39454fe31e (diff) | |
download | spark-4d7c35926371f9e016577987c037abcf984443d9.tar.gz spark-4d7c35926371f9e016577987c037abcf984443d9.tar.bz2 spark-4d7c35926371f9e016577987c037abcf984443d9.zip |
[SPARK-14437][CORE] Use the address that NettyBlockTransferService listens to create BlockManagerId
## What changes were proposed in this pull request?
Here is why SPARK-14437 happens:
BlockManagerId is created using NettyBlockTransferService.hostName which comes from `customHostname`. And `Executor` will set `customHostname` to the hostname which is detected by the driver. However, the driver may not be able to detect the correct address in some complicated network (Netty's Channel.remoteAddress doesn't always return a connectable address). In such case, `BlockManagerId` will be created using a wrong hostname.
To fix this issue, this PR uses `hostname` provided by `SparkEnv.create` to create `NettyBlockTransferService` and set `NettyBlockTransferService.hostname` to this one directly. A bonus of this approach is NettyBlockTransferService won't bound to `0.0.0.0` which is much safer.
## How was this patch tested?
Manually checked the bound address using local-cluster.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12240 from zsxwing/SPARK-14437.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkEnv.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala | 12 |
2 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 700e2cb3f9..ab89f4c4e4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -314,7 +314,8 @@ object SparkEnv extends Logging { UnifiedMemoryManager(conf, numUsableCores) } - val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores) + val blockTransferService = + new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores) val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 5f3d4532dd..33a3219607 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -39,7 +39,11 @@ import org.apache.spark.util.Utils /** * A BlockTransferService that uses Netty to fetch a set of blocks at at time. */ -class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int) +private[spark] class NettyBlockTransferService( + conf: SparkConf, + securityManager: SecurityManager, + override val hostName: String, + numCores: Int) extends BlockTransferService { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. @@ -65,13 +69,13 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId - logInfo("Server created on " + server.getPort) + logInfo(s"Server created on ${hostName}:${server.getPort}") } /** Creates and binds the TransportServer, possibly trying multiple ports. */ private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = { def startService(port: Int): (TransportServer, Int) = { - val server = transportContext.createServer(port, bootstraps.asJava) + val server = transportContext.createServer(hostName, port, bootstraps.asJava) (server, server.getPort) } @@ -109,8 +113,6 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage } } - override def hostName: String = Utils.localHostName() - override def port: Int = server.getPort override def uploadBlock( |