aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 12:58:18 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 12:58:18 -0800
commit4e81783e92f464d479baaf93eccc3adb1496989a (patch)
tree6ba31cd598671110d0e38f0930d36f358cd9b82d /core/src/test/scala/org/apache
parentd29e2ef4cf43c7f7c5aa40d305cf02be44ce19e0 (diff)
downloadspark-4e81783e92f464d479baaf93eccc3adb1496989a.tar.gz
spark-4e81783e92f464d479baaf93eccc3adb1496989a.tar.bz2
spark-4e81783e92f464d479baaf93eccc3adb1496989a.zip
[SPARK-11866][NETWORK][CORE] Make sure timed out RPCs are cleaned up.
This change does a couple of different things to make sure that the RpcEnv-level code and the network library agree about the status of outstanding RPCs. For RPCs that do not expect a reply ("RpcEnv.send"), support for one way messages (hello CORBA!) was added to the network layer. This is a "fire and forget" message that does not require any state to be kept by the TransportClient; as a result, the RpcEnv 'Ack' message is not needed anymore. For RPCs that do expect a reply ("RpcEnv.ask"), the network library now returns the internal RPC id; if the RpcEnv layer decides to time out the RPC before the network layer does, it now asks the TransportClient to forget about the RPC, so that if the network-level timeout occurs, the client is not killed. As part of implementing the above, I cleaned up some of the code in the netty rpc backend, removing types that were not necessary and factoring out some common code. Of interest is a slight change in the exceptions when posting messages to a stopped RpcEnv; that's mostly to avoid nasty error messages from the local-cluster backend when shutting down, which pollutes the terminal output. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9917 from vanzin/SPARK-11866.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala2
2 files changed, 4 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
index 276c077b3d..2136795b18 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
@@ -35,7 +35,7 @@ class InboxSuite extends SparkFunSuite {
val dispatcher = mock(classOf[Dispatcher])
val inbox = new Inbox(endpointRef, endpoint)
- val message = ContentMessage(null, "hi", false, null)
+ val message = OneWayMessage(null, "hi")
inbox.post(message)
inbox.process(dispatcher)
assert(inbox.isEmpty)
@@ -55,7 +55,7 @@ class InboxSuite extends SparkFunSuite {
val dispatcher = mock(classOf[Dispatcher])
val inbox = new Inbox(endpointRef, endpoint)
- val message = ContentMessage(null, "hi", true, null)
+ val message = RpcMessage(null, "hi", null)
inbox.post(message)
inbox.process(dispatcher)
assert(inbox.isEmpty)
@@ -83,7 +83,7 @@ class InboxSuite extends SparkFunSuite {
new Thread {
override def run(): Unit = {
for (_ <- 0 until 100) {
- val message = ContentMessage(null, "hi", false, null)
+ val message = OneWayMessage(null, "hi")
inbox.post(message)
}
exitLatch.countDown()
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
index ccca795683..323184cdd9 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcHandlerSuite.scala
@@ -33,7 +33,7 @@ class NettyRpcHandlerSuite extends SparkFunSuite {
val env = mock(classOf[NettyRpcEnv])
val sm = mock(classOf[StreamManager])
when(env.deserialize(any(classOf[TransportClient]), any(classOf[Array[Byte]]))(any()))
- .thenReturn(RequestMessage(RpcAddress("localhost", 12345), null, null, false))
+ .thenReturn(RequestMessage(RpcAddress("localhost", 12345), null, null))
test("receive") {
val dispatcher = mock(classOf[Dispatcher])