diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-04-08 17:18:19 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-04-08 17:18:19 -0700 |
commit | 4d7c35926371f9e016577987c037abcf984443d9 (patch) | |
tree | 7824fdcaa5d50d2a336e9830d1786945b5ea2529 /core/src | |
parent | 906eef4c7a380419f2d089262afdcf39454fe31e (diff) | |
download | spark-4d7c35926371f9e016577987c037abcf984443d9.tar.gz spark-4d7c35926371f9e016577987c037abcf984443d9.tar.bz2 spark-4d7c35926371f9e016577987c037abcf984443d9.zip |
[SPARK-14437][CORE] Use the address that NettyBlockTransferService listens to create BlockManagerId
## What changes were proposed in this pull request?
Here is why SPARK-14437 happens:
BlockManagerId is created using NettyBlockTransferService.hostName which comes from `customHostname`. And `Executor` will set `customHostname` to the hostname which is detected by the driver. However, the driver may not be able to detect the correct address in some complicated network (Netty's Channel.remoteAddress doesn't always return a connectable address). In such case, `BlockManagerId` will be created using a wrong hostname.
To fix this issue, this PR uses `hostname` provided by `SparkEnv.create` to create `NettyBlockTransferService` and set `NettyBlockTransferService.hostname` to this one directly. A bonus of this approach is NettyBlockTransferService won't bound to `0.0.0.0` which is much safer.
## How was this patch tested?
Manually checked the bound address using local-cluster.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12240 from zsxwing/SPARK-14437.
Diffstat (limited to 'core/src')
6 files changed, 16 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 700e2cb3f9..ab89f4c4e4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -314,7 +314,8 @@ object SparkEnv extends Logging { UnifiedMemoryManager(conf, numUsableCores) } - val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores) + val blockTransferService = + new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores) val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 5f3d4532dd..33a3219607 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -39,7 +39,11 @@ import org.apache.spark.util.Utils /** * A BlockTransferService that uses Netty to fetch a set of blocks at at time. */ -class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int) +private[spark] class NettyBlockTransferService( + conf: SparkConf, + securityManager: SecurityManager, + override val hostName: String, + numCores: Int) extends BlockTransferService { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. @@ -65,13 +69,13 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId - logInfo("Server created on " + server.getPort) + logInfo(s"Server created on ${hostName}:${server.getPort}") } /** Creates and binds the TransportServer, possibly trying multiple ports. */ private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = { def startService(port: Int): (TransportServer, Int) = { - val server = transportContext.createServer(port, bootstraps.asJava) + val server = transportContext.createServer(hostName, port, bootstraps.asJava) (server, server.getPort) } @@ -109,8 +113,6 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage } } - override def hostName: String = Utils.localHostName() - override def port: Int = server.getPort override def uploadBlock( diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 6da18cfd49..ed15e77ff1 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -108,11 +108,11 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer) val securityManager0 = new SecurityManager(conf0) - val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1) + val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", numCores = 1) exec0.init(blockManager) val securityManager1 = new SecurityManager(conf1) - val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1) + val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", numCores = 1) exec1.init(blockManager) val result = fetchBlock(exec0, exec1, "1", blockId) match { diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala index cc1a9e0287..f3c156e4f7 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala @@ -80,7 +80,7 @@ class NettyBlockTransferServiceSuite .set("spark.blockManager.port", port.toString) val securityManager = new SecurityManager(conf) val blockDataManager = mock(classOf[BlockDataManager]) - val service = new NettyBlockTransferService(conf, securityManager, numCores = 1) + val service = new NettyBlockTransferService(conf, securityManager, "localhost", numCores = 1) service.init(blockDataManager) service } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 2ec5319d55..d26df7e760 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { conf.set("spark.testing.memory", maxMem.toString) conf.set("spark.memory.offHeap.size", maxMem.toString) - val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) + val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1) val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) val store = new BlockManager(name, rpcEnv, master, serializerManager, conf, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 66b28de10f..a1c2933584 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -78,7 +78,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf.set("spark.memory.offHeap.size", maxMem.toString) val serializer = new KryoSerializer(conf) val transfer = transferService - .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1)) + .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)) val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf, @@ -490,7 +490,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val blockManager = makeBlockManager(128, "exec", bmMaster) val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0)) - assert(locations.map(_.host) === Seq(localHost, localHost, otherHost)) + assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost)) } test("SPARK-9591: getRemoteBytes from another location when Exception throw") { @@ -852,7 +852,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("block store put failure") { // Use Java serializer so we can create an unserializable error. conf.set("spark.testing.memory", "1200") - val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) + val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1) val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, |