aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-03 22:53:07 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-03-03 22:53:07 -0800
commit465c665db1dc65e3b02c584cf7f8d06b24909b0c (patch)
tree492bb1e42467e944638c4320141154fdfccbd87b /common
parentf6ac7c30d48e666618466e825578fa457e2a0ed4 (diff)
downloadspark-465c665db1dc65e3b02c584cf7f8d06b24909b0c.tar.gz
spark-465c665db1dc65e3b02c584cf7f8d06b24909b0c.tar.bz2
spark-465c665db1dc65e3b02c584cf7f8d06b24909b0c.zip
[SPARK-13652][CORE] Copy ByteBuffer in sendRpcSync as it will be recycled
## What changes were proposed in this pull request? `sendRpcSync` should copy the response content because the underlying buffer will be recycled and reused. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11499 from zsxwing/SPARK-13652.
Diffstat (limited to 'common')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java7
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java6
2 files changed, 11 insertions, 2 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
index 47e93f9846..6afc63f71b 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java
@@ -24,7 +24,12 @@ import java.nio.ByteBuffer;
* failure.
*/
public interface RpcResponseCallback {
- /** Successful serialized result from server. */
+ /**
+ * Successful serialized result from server.
+ *
+ * After `onSuccess` returns, `response` will be recycled and its content will become invalid.
+ * Please copy the content of `response` if you want to use it after `onSuccess` returns.
+ */
void onSuccess(ByteBuffer response);
/** Exception either propagated from server or raised on client side. */
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index e15f096d36..64a83171e9 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -257,7 +257,11 @@ public class TransportClient implements Closeable {
sendRpc(message, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
- result.set(response);
+ ByteBuffer copy = ByteBuffer.allocate(response.remaining());
+ copy.put(response);
+ // flip "copy" to make it readable
+ copy.flip();
+ result.set(copy);
}
@Override