diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-12-18 09:49:08 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-12-18 09:49:08 -0800 |
commit | 2782818287a71925523c1320291db6cb25221e9f (patch) | |
tree | 86e0ed3adb3957b7fae3afbbf96d6dfaa4b1fb8d /core/src | |
parent | ea59b0f3a6600f8046e5f3f55e89257614fb1f10 (diff) | |
download | spark-2782818287a71925523c1320291db6cb25221e9f.tar.gz spark-2782818287a71925523c1320291db6cb25221e9f.tar.bz2 spark-2782818287a71925523c1320291db6cb25221e9f.zip |
[SPARK-12350][CORE] Don't log errors when requested stream is not found.
If a client requests a non-existent stream, just send a failure message
back, without logging any error on the server side (since it's not a
server error).
On the executor side, avoid error logs by translating any errors during
transfer to a `ClassNotFoundException`, so that loading the class is
retried on a the parent class loader. This can mask IO errors during
transmission, but the most common cause is that the class is not
served by the remote end.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #10337 from vanzin/SPARK-12350.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 17 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala | 7 |
2 files changed, 13 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index de3db6ba62..975ea1a1ab 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv( } override def read(dst: ByteBuffer): Int = { - val result = if (error == null) { - Try(source.read(dst)) - } else { - Failure(error) - } - - result match { + Try(source.read(dst)) match { case Success(bytesRead) => bytesRead - case Failure(error) => throw error + case Failure(readErr) => + if (error != null) { + throw error + } else { + throw readErr + } } } @@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv( } override def onFailure(streamId: String, cause: Throwable): Unit = { - logError(s"Error downloading stream $streamId.", cause) + logDebug(s"Error downloading stream $streamId.", cause) source.setError(cause) sink.close() } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 394cde4fa0..afcb023a99 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -58,8 +58,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) new File(dir, fname) } - require(file != null && file.isFile(), s"File not found: $streamId") - new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) + if (file != null && file.isFile()) { + new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) + } else { + null + } } override def addFile(file: File): String = { |