aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala54
1 files changed, 29 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index ef876b1d8c..9ae74d9d7b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -211,33 +211,37 @@ private[netty] class NettyRpcEnv(
}
}
- if (remoteAddr == address) {
- val p = Promise[Any]()
- p.future.onComplete {
- case Success(response) => onSuccess(response)
- case Failure(e) => onFailure(e)
- }(ThreadUtils.sameThread)
- dispatcher.postLocalMessage(message, p)
- } else {
- val rpcMessage = RpcOutboxMessage(serialize(message),
- onFailure,
- (client, response) => onSuccess(deserialize[Any](client, response)))
- postToOutbox(message.receiver, rpcMessage)
- promise.future.onFailure {
- case _: TimeoutException => rpcMessage.onTimeout()
- case _ =>
+ try {
+ if (remoteAddr == address) {
+ val p = Promise[Any]()
+ p.future.onComplete {
+ case Success(response) => onSuccess(response)
+ case Failure(e) => onFailure(e)
+ }(ThreadUtils.sameThread)
+ dispatcher.postLocalMessage(message, p)
+ } else {
+ val rpcMessage = RpcOutboxMessage(serialize(message),
+ onFailure,
+ (client, response) => onSuccess(deserialize[Any](client, response)))
+ postToOutbox(message.receiver, rpcMessage)
+ promise.future.onFailure {
+ case _: TimeoutException => rpcMessage.onTimeout()
+ case _ =>
+ }(ThreadUtils.sameThread)
+ }
+
+ val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
+ override def run(): Unit = {
+ onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
+ }
+ }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
+ promise.future.onComplete { v =>
+ timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
+ } catch {
+ case NonFatal(e) =>
+ onFailure(e)
}
-
- val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
- override def run(): Unit = {
- promise.tryFailure(
- new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
- }
- }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
- promise.future.onComplete { v =>
- timeoutCancelable.cancel(true)
- }(ThreadUtils.sameThread)
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}