diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-12-18 09:49:08 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-12-18 09:49:08 -0800 |
commit | 2782818287a71925523c1320291db6cb25221e9f (patch) | |
tree | 86e0ed3adb3957b7fae3afbbf96d6dfaa4b1fb8d /repl/src | |
parent | ea59b0f3a6600f8046e5f3f55e89257614fb1f10 (diff) | |
download | spark-2782818287a71925523c1320291db6cb25221e9f.tar.gz spark-2782818287a71925523c1320291db6cb25221e9f.tar.bz2 spark-2782818287a71925523c1320291db6cb25221e9f.zip |
[SPARK-12350][CORE] Don't log errors when requested stream is not found.
If a client requests a non-existent stream, just send a failure message
back, without logging any error on the server side (since it's not a
server error).
On the executor side, avoid error logs by translating any errors during
transfer to a `ClassNotFoundException`, so that loading the class is
retried on a the parent class loader. This can mask IO errors during
transmission, but the most common cause is that the class is not
served by the remote end.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #10337 from vanzin/SPARK-12350.
Diffstat (limited to 'repl/src')
-rw-r--r-- | repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala | 21 |
1 files changed, 19 insertions, 2 deletions
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index da8f0aa1e3..de7b831adc 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -17,7 +17,7 @@ package org.apache.spark.repl -import java.io.{IOException, ByteArrayOutputStream, InputStream} +import java.io.{FilterInputStream, ByteArrayOutputStream, InputStream, IOException} import java.net.{HttpURLConnection, URI, URL, URLEncoder} import java.nio.channels.Channels @@ -96,7 +96,24 @@ class ExecutorClassLoader( private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = { val channel = env.rpcEnv.openChannel(s"$classUri/$path") - Channels.newInputStream(channel) + new FilterInputStream(Channels.newInputStream(channel)) { + + override def read(): Int = toClassNotFound(super.read()) + + override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b)) + + override def read(b: Array[Byte], offset: Int, len: Int) = + toClassNotFound(super.read(b, offset, len)) + + private def toClassNotFound(fn: => Int): Int = { + try { + fn + } catch { + case e: Exception => + throw new ClassNotFoundException(path, e) + } + } + } } private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = { |