aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
Diffstat (limited to 'repl')
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala21
1 files changed, 19 insertions, 2 deletions
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index da8f0aa1e3..de7b831adc 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.repl
-import java.io.{IOException, ByteArrayOutputStream, InputStream}
+import java.io.{FilterInputStream, ByteArrayOutputStream, InputStream, IOException}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import java.nio.channels.Channels
@@ -96,7 +96,24 @@ class ExecutorClassLoader(
private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {
val channel = env.rpcEnv.openChannel(s"$classUri/$path")
- Channels.newInputStream(channel)
+ new FilterInputStream(Channels.newInputStream(channel)) {
+
+ override def read(): Int = toClassNotFound(super.read())
+
+ override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b))
+
+ override def read(b: Array[Byte], offset: Int, len: Int) =
+ toClassNotFound(super.read(b, offset, len))
+
+ private def toClassNotFound(fn: => Int): Int = {
+ try {
+ fn
+ } catch {
+ case e: Exception =>
+ throw new ClassNotFoundException(path, e)
+ }
+ }
+ }
}
private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = {