aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/StreamManager.java1
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java7
2 files changed, 7 insertions, 1 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
index 3f0155957a..07f161a29c 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
@@ -54,6 +54,7 @@ public abstract class StreamManager {
* {@link #getChunk(long, int)} method.
*
* @param streamId id of a stream that has been previously registered with the StreamManager.
+ * @return A managed buffer for the stream, or null if the stream was not found.
*/
public ManagedBuffer openStream(String streamId) {
throw new UnsupportedOperationException();
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index c864d7ce16..105f538831 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -141,7 +141,12 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
return;
}
- respond(new StreamResponse(req.streamId, buf.size(), buf));
+ if (buf != null) {
+ respond(new StreamResponse(req.streamId, buf.size(), buf));
+ } else {
+ respond(new StreamFailure(req.streamId, String.format(
+ "Stream '%s' was not found.", req.streamId)));
+ }
}
private void processRpcRequest(final RpcRequest req) {