aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-04-08 17:18:19 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-08 17:18:19 -0700
commit4d7c35926371f9e016577987c037abcf984443d9 (patch)
tree7824fdcaa5d50d2a336e9830d1786945b5ea2529
parent906eef4c7a380419f2d089262afdcf39454fe31e (diff)
downloadspark-4d7c35926371f9e016577987c037abcf984443d9.tar.gz
spark-4d7c35926371f9e016577987c037abcf984443d9.tar.bz2
spark-4d7c35926371f9e016577987c037abcf984443d9.zip
[SPARK-14437][CORE] Use the address that NettyBlockTransferService listens to create BlockManagerId
## What changes were proposed in this pull request? Here is why SPARK-14437 happens: BlockManagerId is created using NettyBlockTransferService.hostName which comes from `customHostname`. And `Executor` will set `customHostname` to the hostname which is detected by the driver. However, the driver may not be able to detect the correct address in some complicated network (Netty's Channel.remoteAddress doesn't always return a connectable address). In such case, `BlockManagerId` will be created using a wrong hostname. To fix this issue, this PR uses `hostname` provided by `SparkEnv.create` to create `NettyBlockTransferService` and set `NettyBlockTransferService.hostname` to this one directly. A bonus of this approach is NettyBlockTransferService won't bound to `0.0.0.0` which is much safer. ## How was this patch tested? Manually checked the bound address using local-cluster. Author: Shixiong Zhu <shixiong@databricks.com> Closes #12240 from zsxwing/SPARK-14437.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala6
-rw-r--r--project/MimaExcludes.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala2
8 files changed, 20 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 700e2cb3f9..ab89f4c4e4 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -314,7 +314,8 @@ object SparkEnv extends Logging {
UnifiedMemoryManager(conf, numUsableCores)
}
- val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
+ val blockTransferService =
+ new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 5f3d4532dd..33a3219607 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -39,7 +39,11 @@ import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
*/
-class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
+private[spark] class NettyBlockTransferService(
+ conf: SparkConf,
+ securityManager: SecurityManager,
+ override val hostName: String,
+ numCores: Int)
extends BlockTransferService {
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
@@ -65,13 +69,13 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
- logInfo("Server created on " + server.getPort)
+ logInfo(s"Server created on ${hostName}:${server.getPort}")
}
/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
- val server = transportContext.createServer(port, bootstraps.asJava)
+ val server = transportContext.createServer(hostName, port, bootstraps.asJava)
(server, server.getPort)
}
@@ -109,8 +113,6 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
}
}
- override def hostName: String = Utils.localHostName()
-
override def port: Int = server.getPort
override def uploadBlock(
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 6da18cfd49..ed15e77ff1 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -108,11 +108,11 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
val securityManager0 = new SecurityManager(conf0)
- val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1)
+ val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", numCores = 1)
exec0.init(blockManager)
val securityManager1 = new SecurityManager(conf1)
- val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1)
+ val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", numCores = 1)
exec1.init(blockManager)
val result = fetchBlock(exec0, exec1, "1", blockId) match {
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index cc1a9e0287..f3c156e4f7 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -80,7 +80,7 @@ class NettyBlockTransferServiceSuite
.set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
- val service = new NettyBlockTransferService(conf, securityManager, numCores = 1)
+ val service = new NettyBlockTransferService(conf, securityManager, "localhost", numCores = 1)
service.init(blockDataManager)
service
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 2ec5319d55..d26df7e760 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
conf.set("spark.memory.offHeap.size", maxMem.toString)
- val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 66b28de10f..a1c2933584 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -78,7 +78,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.memory.offHeap.size", maxMem.toString)
val serializer = new KryoSerializer(conf)
val transfer = transferService
- .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
+ .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1))
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
@@ -490,7 +490,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockManager = makeBlockManager(128, "exec", bmMaster)
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
- assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+ assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
}
test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
@@ -852,7 +852,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
conf.set("spark.testing.memory", "1200")
- val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a53161dc9a..f240c30427 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -615,6 +615,9 @@ object MimaExcludes {
// [SPARK-13430][ML] moved featureCol from LinearRegressionModelSummary to LinearRegressionSummary
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.regression.LinearRegressionSummary.this")
) ++ Seq(
+ // [SPARK-14437][Core] Use the address that NettyBlockTransferService listens to create BlockManagerId
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.this")
+ ) ++ Seq(
// [SPARK-13048][ML][MLLIB] keepLastCheckpoint option for LDA EM optimizer
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.this")
)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 5fc53bcb91..39d0de5179 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -266,7 +266,7 @@ class ReceivedBlockHandlerSuite
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)