aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-04-08 17:18:19 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-08 17:18:19 -0700
commit4d7c35926371f9e016577987c037abcf984443d9 (patch)
tree7824fdcaa5d50d2a336e9830d1786945b5ea2529 /core/src/main/scala
parent906eef4c7a380419f2d089262afdcf39454fe31e (diff)
downloadspark-4d7c35926371f9e016577987c037abcf984443d9.tar.gz
spark-4d7c35926371f9e016577987c037abcf984443d9.tar.bz2
spark-4d7c35926371f9e016577987c037abcf984443d9.zip
[SPARK-14437][CORE] Use the address that NettyBlockTransferService listens to create BlockManagerId
## What changes were proposed in this pull request? Here is why SPARK-14437 happens: BlockManagerId is created using NettyBlockTransferService.hostName which comes from `customHostname`. And `Executor` will set `customHostname` to the hostname which is detected by the driver. However, the driver may not be able to detect the correct address in some complicated network (Netty's Channel.remoteAddress doesn't always return a connectable address). In such case, `BlockManagerId` will be created using a wrong hostname. To fix this issue, this PR uses `hostname` provided by `SparkEnv.create` to create `NettyBlockTransferService` and set `NettyBlockTransferService.hostname` to this one directly. A bonus of this approach is NettyBlockTransferService won't bound to `0.0.0.0` which is much safer. ## How was this patch tested? Manually checked the bound address using local-cluster. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12240 from zsxwing/SPARK-14437.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala12
2 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 700e2cb3f9..ab89f4c4e4 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -314,7 +314,8 @@ object SparkEnv extends Logging {
UnifiedMemoryManager(conf, numUsableCores)
}
- val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
+ val blockTransferService =
+ new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 5f3d4532dd..33a3219607 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -39,7 +39,11 @@ import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
*/
-class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
+private[spark] class NettyBlockTransferService(
+ conf: SparkConf,
+ securityManager: SecurityManager,
+ override val hostName: String,
+ numCores: Int)
extends BlockTransferService {
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
@@ -65,13 +69,13 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
- logInfo("Server created on " + server.getPort)
+ logInfo(s"Server created on ${hostName}:${server.getPort}")
}
/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
- val server = transportContext.createServer(port, bootstraps.asJava)
+ val server = transportContext.createServer(hostName, port, bootstraps.asJava)
(server, server.getPort)
}
@@ -109,8 +113,6 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
}
}
- override def hostName: String = Utils.localHostName()
-
override def port: Int = server.getPort
override def uploadBlock(