diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-05-18 20:15:00 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-05-18 20:15:00 -0700 |
commit | 5c9117a3ed373461529f9f9306668ed4149c63fb (patch) | |
tree | 470f3c9a1c676327188673f3597cf2790b4f179e /core | |
parent | b1bc5ebdd52ed12aea3fdc7b8f2fa2d00ea09c6b (diff) | |
download | spark-5c9117a3ed373461529f9f9306668ed4149c63fb.tar.gz spark-5c9117a3ed373461529f9f9306668ed4149c63fb.tar.bz2 spark-5c9117a3ed373461529f9f9306668ed4149c63fb.zip |
[SPARK-15395][CORE] Use getHostString to create RpcAddress
## What changes were proposed in this pull request?
Right now the netty RPC uses `InetSocketAddress.getHostName` to create `RpcAddress` for network events. If we use an IP address to connect, then the RpcAddress's host will be a host name (if the reverse lookup successes) instead of the IP address. However, some places need to compare the original IP address and the RpcAddress in `onDisconnect` (e.g., CoarseGrainedExecutorBackend), and this behavior will make the check incorrect.
This PR uses `getHostString` to resolve the issue.
## How was this patch tested?
Jenkins unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #13185 from zsxwing/host-string.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 7d7b4c82fa..89d2fb9b47 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -574,7 +574,7 @@ private[netty] class NettyRpcHandler( private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = { val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] assert(addr != null) - val clientAddr = RpcAddress(addr.getHostName, addr.getPort) + val clientAddr = RpcAddress(addr.getHostString, addr.getPort) val requestMessage = nettyEnv.deserialize[RequestMessage](client, message) if (requestMessage.senderAddress == null) { // Create a new message with the socket address of the client as the sender. @@ -595,7 +595,7 @@ private[netty] class NettyRpcHandler( override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = { val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress] if (addr != null) { - val clientAddr = RpcAddress(addr.getHostName, addr.getPort) + val clientAddr = RpcAddress(addr.getHostString, addr.getPort) dispatcher.postToAll(RemoteProcessConnectionError(cause, clientAddr)) // If the remove RpcEnv listens to some address, we should also fire a // RemoteProcessConnectionError for the remote RpcEnv listening address @@ -614,14 +614,14 @@ private[netty] class NettyRpcHandler( override def channelActive(client: TransportClient): Unit = { val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] assert(addr != null) - val clientAddr = RpcAddress(addr.getHostName, addr.getPort) + val clientAddr = RpcAddress(addr.getHostString, addr.getPort) dispatcher.postToAll(RemoteProcessConnected(clientAddr)) } override def channelInactive(client: TransportClient): Unit = { val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress] if (addr != null) { - val clientAddr = RpcAddress(addr.getHostName, addr.getPort) + val clientAddr = RpcAddress(addr.getHostString, addr.getPort) nettyEnv.removeOutbox(clientAddr) dispatcher.postToAll(RemoteProcessDisconnected(clientAddr)) val remoteEnvAddress = remoteAddresses.remove(clientAddr) |