aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-05-18 20:15:00 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-05-18 20:15:00 -0700
commit5c9117a3ed373461529f9f9306668ed4149c63fb (patch)
tree470f3c9a1c676327188673f3597cf2790b4f179e /core/src
parentb1bc5ebdd52ed12aea3fdc7b8f2fa2d00ea09c6b (diff)
downloadspark-5c9117a3ed373461529f9f9306668ed4149c63fb.tar.gz
spark-5c9117a3ed373461529f9f9306668ed4149c63fb.tar.bz2
spark-5c9117a3ed373461529f9f9306668ed4149c63fb.zip
[SPARK-15395][CORE] Use getHostString to create RpcAddress
## What changes were proposed in this pull request? Right now the netty RPC uses `InetSocketAddress.getHostName` to create `RpcAddress` for network events. If we use an IP address to connect, then the RpcAddress's host will be a host name (if the reverse lookup successes) instead of the IP address. However, some places need to compare the original IP address and the RpcAddress in `onDisconnect` (e.g., CoarseGrainedExecutorBackend), and this behavior will make the check incorrect. This PR uses `getHostString` to resolve the issue. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13185 from zsxwing/host-string.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 7d7b4c82fa..89d2fb9b47 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -574,7 +574,7 @@ private[netty] class NettyRpcHandler(
private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = {
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
assert(addr != null)
- val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
+ val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
val requestMessage = nettyEnv.deserialize[RequestMessage](client, message)
if (requestMessage.senderAddress == null) {
// Create a new message with the socket address of the client as the sender.
@@ -595,7 +595,7 @@ private[netty] class NettyRpcHandler(
override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
if (addr != null) {
- val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
+ val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr))
// If the remove RpcEnv listens to some address, we should also fire a
// RemoteProcessConnectionError for the remote RpcEnv listening address
@@ -614,14 +614,14 @@ private[netty] class NettyRpcHandler(
override def channelActive(client: TransportClient): Unit = {
val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
assert(addr != null)
- val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
+ val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
dispatcher.postToAll(RemoteProcessConnected(clientAddr))
}
override def channelInactive(client: TransportClient): Unit = {
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
if (addr != null) {
- val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
+ val clientAddr = RpcAddress(addr.getHostString, addr.getPort)
nettyEnv.removeOutbox(clientAddr)
dispatcher.postToAll(RemoteProcessDisconnected(clientAddr))
val remoteEnvAddress = remoteAddresses.remove(clientAddr)