aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala17
1 files changed, 8 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index de3db6ba62..975ea1a1ab 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv(
}
override def read(dst: ByteBuffer): Int = {
- val result = if (error == null) {
- Try(source.read(dst))
- } else {
- Failure(error)
- }
-
- result match {
+ Try(source.read(dst)) match {
case Success(bytesRead) => bytesRead
- case Failure(error) => throw error
+ case Failure(readErr) =>
+ if (error != null) {
+ throw error
+ } else {
+ throw readErr
+ }
}
}
@@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv(
}
override def onFailure(streamId: String, cause: Throwable): Unit = {
- logError(s"Error downloading stream $streamId.", cause)
+ logDebug(s"Error downloading stream $streamId.", cause)
source.setError(cause)
sink.close()
}