aboutsummaryrefslogtreecommitdiff
path: root/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java16
1 files changed, 10 insertions, 6 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index 4c15045363..23a8dba593 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -136,7 +136,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
@Override
- public void handle(ResponseMessage message) {
+ public void handle(ResponseMessage message) throws Exception {
String remoteAddress = NettyUtils.getRemoteAddress(channel);
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
@@ -144,11 +144,11 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
resp.streamChunkId, remoteAddress);
- resp.body.release();
+ resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
- listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body);
- resp.body.release();
+ listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
+ resp.body().release();
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
@@ -166,10 +166,14 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
- resp.requestId, remoteAddress, resp.response.length);
+ resp.requestId, remoteAddress, resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
- listener.onSuccess(resp.response);
+ try {
+ listener.onSuccess(resp.body().nioByteBuffer());
+ } finally {
+ resp.body().release();
+ }
}
} else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message;