aboutsummaryrefslogtreecommitdiff
path: root/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java
diff options
context:
space:
mode:
Diffstat (limited to 'common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java')
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java87
1 files changed, 87 insertions, 0 deletions
diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java
new file mode 100644
index 0000000000..bae866e14a
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/RpcResponse.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/** Response to {@link RpcRequest} for a successful RPC. */
+public final class RpcResponse extends AbstractResponseMessage {
+ public final long requestId;
+
+ public RpcResponse(long requestId, ManagedBuffer message) {
+ super(message, true);
+ this.requestId = requestId;
+ }
+
+ @Override
+ public Type type() { return Type.RpcResponse; }
+
+ @Override
+ public int encodedLength() {
+ // The integer (a.k.a. the body size) is not really used, since that information is already
+ // encoded in the frame length. But this maintains backwards compatibility with versions of
+ // RpcRequest that use Encoders.ByteArrays.
+ return 8 + 4;
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ buf.writeLong(requestId);
+ // See comment in encodedLength().
+ buf.writeInt((int) body().size());
+ }
+
+ @Override
+ public ResponseMessage createFailureResponse(String error) {
+ return new RpcFailure(requestId, error);
+ }
+
+ public static RpcResponse decode(ByteBuf buf) {
+ long requestId = buf.readLong();
+ // See comment in encodedLength().
+ buf.readInt();
+ return new RpcResponse(requestId, new NettyManagedBuffer(buf.retain()));
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(requestId, body());
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof RpcResponse) {
+ RpcResponse o = (RpcResponse) other;
+ return requestId == o.requestId && super.equals(o);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("requestId", requestId)
+ .add("body", body())
+ .toString();
+ }
+}