From 2782818287a71925523c1320291db6cb25221e9f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 18 Dec 2015 09:49:08 -0800 Subject: [SPARK-12350][CORE] Don't log errors when requested stream is not found. If a client requests a non-existent stream, just send a failure message back, without logging any error on the server side (since it's not a server error). On the executor side, avoid error logs by translating any errors during transfer to a `ClassNotFoundException`, so that loading the class is retried on a the parent class loader. This can mask IO errors during transmission, but the most common cause is that the class is not served by the remote end. Author: Marcelo Vanzin Closes #10337 from vanzin/SPARK-12350. --- .../org/apache/spark/rpc/netty/NettyRpcEnv.scala | 17 ++++++++--------- .../apache/spark/rpc/netty/NettyStreamManager.scala | 7 +++++-- .../apache/spark/network/server/StreamManager.java | 1 + .../network/server/TransportRequestHandler.java | 7 ++++++- .../org/apache/spark/repl/ExecutorClassLoader.scala | 21 +++++++++++++++++++-- 5 files changed, 39 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index de3db6ba62..975ea1a1ab 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv( } override def read(dst: ByteBuffer): Int = { - val result = if (error == null) { - Try(source.read(dst)) - } else { - Failure(error) - } - - result match { + Try(source.read(dst)) match { case Success(bytesRead) => bytesRead - case Failure(error) => throw error + case Failure(readErr) => + if (error != null) { + throw error + } else { + throw readErr + } } } @@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv( } override def onFailure(streamId: String, cause: Throwable): Unit = { - logError(s"Error downloading stream $streamId.", cause) + logDebug(s"Error downloading stream $streamId.", cause) source.setError(cause) sink.close() } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index 394cde4fa0..afcb023a99 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -58,8 +58,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) new File(dir, fname) } - require(file != null && file.isFile(), s"File not found: $streamId") - new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) + if (file != null && file.isFile()) { + new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length()) + } else { + null + } } override def addFile(file: File): String = { diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java index 3f0155957a..07f161a29c 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java @@ -54,6 +54,7 @@ public abstract class StreamManager { * {@link #getChunk(long, int)} method. * * @param streamId id of a stream that has been previously registered with the StreamManager. + * @return A managed buffer for the stream, or null if the stream was not found. */ public ManagedBuffer openStream(String streamId) { throw new UnsupportedOperationException(); diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index c864d7ce16..105f538831 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -141,7 +141,12 @@ public class TransportRequestHandler extends MessageHandler { return; } - respond(new StreamResponse(req.streamId, buf.size(), buf)); + if (buf != null) { + respond(new StreamResponse(req.streamId, buf.size(), buf)); + } else { + respond(new StreamFailure(req.streamId, String.format( + "Stream '%s' was not found.", req.streamId))); + } } private void processRpcRequest(final RpcRequest req) { diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index da8f0aa1e3..de7b831adc 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -17,7 +17,7 @@ package org.apache.spark.repl -import java.io.{IOException, ByteArrayOutputStream, InputStream} +import java.io.{FilterInputStream, ByteArrayOutputStream, InputStream, IOException} import java.net.{HttpURLConnection, URI, URL, URLEncoder} import java.nio.channels.Channels @@ -96,7 +96,24 @@ class ExecutorClassLoader( private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = { val channel = env.rpcEnv.openChannel(s"$classUri/$path") - Channels.newInputStream(channel) + new FilterInputStream(Channels.newInputStream(channel)) { + + override def read(): Int = toClassNotFound(super.read()) + + override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b)) + + override def read(b: Array[Byte], offset: Int, len: Int) = + toClassNotFound(super.read(b, offset, len)) + + private def toClassNotFound(fn: => Int): Int = { + try { + fn + } catch { + case e: Exception => + throw new ClassNotFoundException(path, e) + } + } + } } private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = { -- cgit v1.2.3