aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-12-18 09:49:08 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-18 09:49:08 -0800
commit2782818287a71925523c1320291db6cb25221e9f (patch)
tree86e0ed3adb3957b7fae3afbbf96d6dfaa4b1fb8d /core
parentea59b0f3a6600f8046e5f3f55e89257614fb1f10 (diff)
downloadspark-2782818287a71925523c1320291db6cb25221e9f.tar.gz
spark-2782818287a71925523c1320291db6cb25221e9f.tar.bz2
spark-2782818287a71925523c1320291db6cb25221e9f.zip
[SPARK-12350][CORE] Don't log errors when requested stream is not found.
If a client requests a non-existent stream, just send a failure message back, without logging any error on the server side (since it's not a server error). On the executor side, avoid error logs by translating any errors during transfer to a `ClassNotFoundException`, so that loading the class is retried on a the parent class loader. This can mask IO errors during transmission, but the most common cause is that the class is not served by the remote end. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #10337 from vanzin/SPARK-12350.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala7
2 files changed, 13 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index de3db6ba62..975ea1a1ab 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv(
}
override def read(dst: ByteBuffer): Int = {
- val result = if (error == null) {
- Try(source.read(dst))
- } else {
- Failure(error)
- }
-
- result match {
+ Try(source.read(dst)) match {
case Success(bytesRead) => bytesRead
- case Failure(error) => throw error
+ case Failure(readErr) =>
+ if (error != null) {
+ throw error
+ } else {
+ throw readErr
+ }
}
}
@@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv(
}
override def onFailure(streamId: String, cause: Throwable): Unit = {
- logError(s"Error downloading stream $streamId.", cause)
+ logDebug(s"Error downloading stream $streamId.", cause)
source.setError(cause)
sink.close()
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index 394cde4fa0..afcb023a99 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -58,8 +58,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
new File(dir, fname)
}
- require(file != null && file.isFile(), s"File not found: $streamId")
- new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
+ if (file != null && file.isFile()) {
+ new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
+ } else {
+ null
+ }
}
override def addFile(file: File): String = {