aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-01 18:36:06 -0700
committerReynold Xin <rxin@databricks.com>2015-04-01 18:36:06 -0700
commit899ebcb1448126f40be784ce42e69218e9a1ead7 (patch)
tree3a1d38df98de520fb66020f5f67d20bf6afffc63 /network
parent4815bc2128c7f6d4d21da730b8c72da087233b34 (diff)
downloadspark-899ebcb1448126f40be784ce42e69218e9a1ead7.tar.gz
spark-899ebcb1448126f40be784ce42e69218e9a1ead7.tar.bz2
spark-899ebcb1448126f40be784ce42e69218e9a1ead7.zip
[SPARK-6578] Small rewrite to make the logic more clear in MessageWithHeader.transferTo.
Author: Reynold Xin <rxin@databricks.com> Closes #5319 from rxin/SPARK-6578 and squashes the following commits: 7c62a64 [Reynold Xin] Small rewrite to make the logic more clear in transferTo.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java43
1 files changed, 23 insertions, 20 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index 215a8517e8..d686a95146 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -21,15 +21,15 @@ import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBuf;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;
/**
- * A wrapper message that holds two separate pieces (a header and a body) to avoid
- * copying the body's content.
+ * A wrapper message that holds two separate pieces (a header and a body).
+ *
+ * The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
*/
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
@@ -63,32 +63,36 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
return totalBytesTransferred;
}
+ /**
+ * This code is more complicated than you would think because we might require multiple
+ * transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
+ *
+ * The contract is that the caller will ensure position is properly set to the total number
+ * of bytes transferred so far (i.e. value returned by transfered()).
+ */
@Override
- public long transferTo(WritableByteChannel target, long position) throws IOException {
+ public long transferTo(final WritableByteChannel target, final long position) throws IOException {
Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
- long written = 0;
-
- if (position < headerLength) {
- written += copyByteBuf(header, target);
+ // Bytes written for header in this call.
+ long writtenHeader = 0;
+ if (header.readableBytes() > 0) {
+ writtenHeader = copyByteBuf(header, target);
+ totalBytesTransferred += writtenHeader;
if (header.readableBytes() > 0) {
- totalBytesTransferred += written;
- return written;
+ return writtenHeader;
}
}
+ // Bytes written for body in this call.
+ long writtenBody = 0;
if (body instanceof FileRegion) {
- // Adjust the position. If the write is happening as part of the same call where the header
- // (or some part of it) is written, `position` will be less than the header size, so we want
- // to start from position 0 in the FileRegion object. Otherwise, we start from the position
- // requested by the caller.
- long bodyPos = position > headerLength ? position - headerLength : 0;
- written += ((FileRegion)body).transferTo(target, bodyPos);
+ writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
} else if (body instanceof ByteBuf) {
- written += copyByteBuf((ByteBuf) body, target);
+ writtenBody = copyByteBuf((ByteBuf) body, target);
}
+ totalBytesTransferred += writtenBody;
- totalBytesTransferred += written;
- return written;
+ return writtenHeader + writtenBody;
}
@Override
@@ -102,5 +106,4 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
buf.skipBytes(written);
return written;
}
-
}