From 084e4e126211d74a79e8dbd2d0e604dd3c650822 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 23 Sep 2015 18:59:49 -0700 Subject: [SPARK-6028][Core]A new RPC implemetation based on the network module Design doc: https://docs.google.com/document/d/1CF5G6rGVQMKSyV_QKo4D2M-x6rxz5x1Ew7aK3Uq6u8c/edit?usp=sharing Author: zsxwing Closes #6457 from zsxwing/new-rpc. --- .../main/java/org/apache/spark/network/client/TransportClient.java | 4 ++++ .../src/main/java/org/apache/spark/network/server/RpcHandler.java | 2 ++ .../java/org/apache/spark/network/server/TransportRequestHandler.java | 1 + 3 files changed, 7 insertions(+) (limited to 'network') diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java index df841288a0..fbb8bb6b2f 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -78,6 +78,10 @@ public class TransportClient implements Closeable { this.handler = Preconditions.checkNotNull(handler); } + public Channel getChannel() { + return channel; + } + public boolean isActive() { return channel.isOpen() || channel.isActive(); } diff --git a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java index 2ba92a40f8..dbb7f95f55 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java @@ -52,4 +52,6 @@ public abstract class RpcHandler { * No further requests will come from this client. */ public void connectionTerminated(TransportClient client) { } + + public void exceptionCaught(Throwable cause, TransportClient client) { } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index df60278058..96941d26be 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -71,6 +71,7 @@ public class TransportRequestHandler extends MessageHandler { @Override public void exceptionCaught(Throwable cause) { + rpcHandler.exceptionCaught(cause, reverseClient); } @Override -- cgit v1.2.3