aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala7
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/StreamManager.java1
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java7
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala21
5 files changed, 39 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index de3db6ba62..975ea1a1ab 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -363,15 +363,14 @@ private[netty] class NettyRpcEnv(
}
override def read(dst: ByteBuffer): Int = {
- val result = if (error == null) {
- Try(source.read(dst))
- } else {
- Failure(error)
- }
-
- result match {
+ Try(source.read(dst)) match {
case Success(bytesRead) => bytesRead
- case Failure(error) => throw error
+ case Failure(readErr) =>
+ if (error != null) {
+ throw error
+ } else {
+ throw readErr
+ }
}
}
@@ -397,7 +396,7 @@ private[netty] class NettyRpcEnv(
}
override def onFailure(streamId: String, cause: Throwable): Unit = {
- logError(s"Error downloading stream $streamId.", cause)
+ logDebug(s"Error downloading stream $streamId.", cause)
source.setError(cause)
sink.close()
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index 394cde4fa0..afcb023a99 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -58,8 +58,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
new File(dir, fname)
}
- require(file != null && file.isFile(), s"File not found: $streamId")
- new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
+ if (file != null && file.isFile()) {
+ new FileSegmentManagedBuffer(rpcEnv.transportConf, file, 0, file.length())
+ } else {
+ null
+ }
}
override def addFile(file: File): String = {
diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
index 3f0155957a..07f161a29c 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
@@ -54,6 +54,7 @@ public abstract class StreamManager {
* {@link #getChunk(long, int)} method.
*
* @param streamId id of a stream that has been previously registered with the StreamManager.
+ * @return A managed buffer for the stream, or null if the stream was not found.
*/
public ManagedBuffer openStream(String streamId) {
throw new UnsupportedOperationException();
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index c864d7ce16..105f538831 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -141,7 +141,12 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
return;
}
- respond(new StreamResponse(req.streamId, buf.size(), buf));
+ if (buf != null) {
+ respond(new StreamResponse(req.streamId, buf.size(), buf));
+ } else {
+ respond(new StreamFailure(req.streamId, String.format(
+ "Stream '%s' was not found.", req.streamId)));
+ }
}
private void processRpcRequest(final RpcRequest req) {
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index da8f0aa1e3..de7b831adc 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.repl
-import java.io.{IOException, ByteArrayOutputStream, InputStream}
+import java.io.{FilterInputStream, ByteArrayOutputStream, InputStream, IOException}
import java.net.{HttpURLConnection, URI, URL, URLEncoder}
import java.nio.channels.Channels
@@ -96,7 +96,24 @@ class ExecutorClassLoader(
private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {
val channel = env.rpcEnv.openChannel(s"$classUri/$path")
- Channels.newInputStream(channel)
+ new FilterInputStream(Channels.newInputStream(channel)) {
+
+ override def read(): Int = toClassNotFound(super.read())
+
+ override def read(b: Array[Byte]): Int = toClassNotFound(super.read(b))
+
+ override def read(b: Array[Byte], offset: Int, len: Int) =
+ toClassNotFound(super.read(b, offset, len))
+
+ private def toClassNotFound(fn: => Int): Int = {
+ try {
+ fn
+ } catch {
+ case e: Exception =>
+ throw new ClassNotFoundException(path, e)
+ }
+ }
+ }
}
private def getClassFileInputStreamFromHttpServer(pathInDirectory: String): InputStream = {