diff options
author | Reynold Xin <rxin@databricks.com> | 2015-10-13 09:51:20 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-10-13 09:51:20 -0700 |
commit | 1797055dbf1d2fd7714d7c65c8d2efde2f15efc1 (patch) | |
tree | a2fd05a8ba259c25dd01bf0b4af48c2466a39b83 /core/src/test/scala/org/apache | |
parent | 6987c067937a50867b4d5788f5bf496ecdfdb62c (diff) | |
download | spark-1797055dbf1d2fd7714d7c65c8d2efde2f15efc1.tar.gz spark-1797055dbf1d2fd7714d7c65c8d2efde2f15efc1.tar.bz2 spark-1797055dbf1d2fd7714d7c65c8d2efde2f15efc1.zip |
[SPARK-11079] Post-hoc review Netty-based RPC - round 1
I'm going through the implementation right now for post-doc review. Adding more comments and renaming things as I go through them.
I also want to write higher level documentation about how the whole thing works -- but those will come in other pull requests.
Author: Reynold Xin <rxin@databricks.com>
Closes #9091 from rxin/rpc-review.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r-- | core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala | 6 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala | 7 |
2 files changed, 7 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala index 120cf1b6fa..276c077b3d 100644 --- a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala @@ -113,7 +113,7 @@ class InboxSuite extends SparkFunSuite { val remoteAddress = RpcAddress("localhost", 11111) val inbox = new Inbox(endpointRef, endpoint) - inbox.post(Associated(remoteAddress)) + inbox.post(RemoteProcessConnected(remoteAddress)) inbox.process(dispatcher) endpoint.verifySingleOnConnectedMessage(remoteAddress) @@ -127,7 +127,7 @@ class InboxSuite extends SparkFunSuite { val remoteAddress = RpcAddress("localhost", 11111) val inbox = new Inbox(endpointRef, endpoint) - inbox.post(Disassociated(remoteAddress)) + inbox.post(RemoteProcessDisconnected(remoteAddress)) inbox.process(dispatcher) endpoint.verifySingleOnDisconnectedMessage(remoteAddress) @@ -142,7 +142,7 @@ class InboxSuite extends SparkFunSuite { val cause = new RuntimeException("Oops") val inbox = new Inbox(endpointRef, endpoint) - inbox.post(AssociationError(cause, remoteAddress)) + inbox.post(RemoteProcessConnectionError(cause, remoteAddress)) inbox.process(dispatcher) endpoint.verifySingleOnNetworkErrorMessage(cause, remoteAddress) diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala index 06ca035d19..f24f78b8c4 100644 --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala @@ -45,7 +45,7 @@ class NettyRpcHandlerSuite extends SparkFunSuite { when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40001)) nettyRpcHandler.receive(client, null, null) - verify(dispatcher, times(1)).broadcastMessage(Associated(RpcAddress("localhost", 12345))) + verify(dispatcher, times(1)).postToAll(RemoteProcessConnected(RpcAddress("localhost", 12345))) } test("connectionTerminated") { @@ -60,8 +60,9 @@ class NettyRpcHandlerSuite extends SparkFunSuite { when(channel.remoteAddress()).thenReturn(new InetSocketAddress("localhost", 40000)) nettyRpcHandler.connectionTerminated(client) - verify(dispatcher, times(1)).broadcastMessage(Associated(RpcAddress("localhost", 12345))) - verify(dispatcher, times(1)).broadcastMessage(Disassociated(RpcAddress("localhost", 12345))) + verify(dispatcher, times(1)).postToAll(RemoteProcessConnected(RpcAddress("localhost", 12345))) + verify(dispatcher, times(1)).postToAll( + RemoteProcessDisconnected(RpcAddress("localhost", 12345))) } } |