diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 17 |
1 files changed, 8 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index de3db6ba62..975ea1a1ab 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv( } override def read(dst: ByteBuffer): Int = { - val result = if (error == null) { - Try(source.read(dst)) - } else { - Failure(error) - } - - result match { + Try(source.read(dst)) match { case Success(bytesRead) => bytesRead - case Failure(error) => throw error + case Failure(readErr) => + if (error != null) { + throw error + } else { + throw readErr + } } } @@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv( } override def onFailure(streamId: String, cause: Throwable): Unit = { - logError(s"Error downloading stream $streamId.", cause) + logDebug(s"Error downloading stream $streamId.", cause) source.setError(cause) sink.close() } |