aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-12-18 09:49:08 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-18 09:49:08 -0800
commit2782818287a71925523c1320291db6cb25221e9f (patch)
tree86e0ed3adb3957b7fae3afbbf96d6dfaa4b1fb8d /repl
parentea59b0f3a6600f8046e5f3f55e89257614fb1f10 (diff)
downloadspark-2782818287a71925523c1320291db6cb25221e9f.tar.gz
spark-2782818287a71925523c1320291db6cb25221e9f.tar.bz2
spark-2782818287a71925523c1320291db6cb25221e9f.zip
[SPARK-12350][CORE] Don't log errors when requested stream is not found.
If a client requests a non-existent stream, just send a failure message back, without logging any error on the server side (since it's not a server error). On the executor side, avoid error logs by translating any errors during transfer to a `ClassNotFoundException`, so that loading the class is retried on a the parent class loader. This can mask IO errors during transmission, but the most common cause is that the class is not served by the remote end. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #10337 from vanzin/SPARK-12350.
Diffstat (limited to 'repl')
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala21
1 files changed, 19 insertions, 2 deletions
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index da8f0aa1e3..de7b831adc 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.repl
-import java.io.{IOException, ByteArrayOutputStream, InputStream}
+import java.io.{FilterInputStream, ByteArrayOutputStream, InputStream, IOException}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import java.nio.channels.Channels
@@ -96,7 +96,24 @@ class ExecutorClassLoader(
private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {
val channel = env.rpcEnv.openChannel(s"$classUri/$path")
- Channels.newInputStream(channel)
+ new FilterInputStream(Channels.newInputStream(channel)) {
+
+ override def read(): Int = toClassNotFound(super.read())
+
+ override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b))
+
+ override def read(b: Array[Byte], offset: Int, len: Int) =
+ toClassNotFound(super.read(b, offset, len))
+
+ private def toClassNotFound(fn: => Int): Int = {
+ try {
+ fn
+ } catch {
+ case e: Exception =>
+ throw new ClassNotFoundException(path, e)
+ }
+ }
+ }
}
private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = {