aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2017-01-04 11:21:09 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-01-04 11:21:09 -0800
commit00074b57786cfe4a0d67d617d04d3a1ea21c9ae5 (patch)
tree6cdaedbcfda77ed0f9af516439b0ded0fee8a97d
parent4262fb0d55aed1a023e1813e09deefda8a7ce26b (diff)
downloadspark-00074b57786cfe4a0d67d617d04d3a1ea21c9ae5.tar.gz
spark-00074b57786cfe4a0d67d617d04d3a1ea21c9ae5.tar.bz2
spark-00074b57786cfe4a0d67d617d04d3a1ea21c9ae5.zip
[SPARK-19062] Utils.writeByteBuffer bug fix
This commit changes Utils.writeByteBuffer so that it does not change the position of the ByteBuffer that it writes out, and adds a unit test for this functionality. cc mridulm Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #16462 from kayousterhout/SPARK-19062.
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala25
2 files changed, 28 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 078cc3d5b4..0dcf0307e1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -237,9 +237,11 @@ private[spark] object Utils extends Logging {
if (bb.hasArray) {
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
} else {
+ val originalPosition = bb.position()
val bbval = new Array[Byte](bb.remaining())
bb.get(bbval)
out.write(bbval)
+ bb.position(originalPosition)
}
}
@@ -250,9 +252,11 @@ private[spark] object Utils extends Logging {
if (bb.hasArray) {
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
} else {
+ val originalPosition = bb.position()
val bbval = new Array[Byte](bb.remaining())
bb.get(bbval)
out.write(bbval)
+ bb.position(originalPosition)
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fb7b91222b..442a603cae 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.util
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, PrintStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File,
+ FileOutputStream, PrintStream}
import java.lang.{Double => JDouble, Float => JFloat}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
@@ -389,6 +390,28 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(Utils.deserializeLongValue(bbuf.array) === testval)
}
+ test("writeByteBuffer should not change ByteBuffer position") {
+ // Test a buffer with an underlying array, for both writeByteBuffer methods.
+ val testBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
+ assert(testBuffer.hasArray)
+ val bytesOut = new ByteBufferOutputStream(4096)
+ Utils.writeByteBuffer(testBuffer, bytesOut)
+ assert(testBuffer.position() === 0)
+
+ val dataOut = new DataOutputStream(bytesOut)
+ Utils.writeByteBuffer(testBuffer, dataOut: DataOutput)
+ assert(testBuffer.position() === 0)
+
+ // Test a buffer without an underlying array, for both writeByteBuffer methods.
+ val testDirectBuffer = ByteBuffer.allocateDirect(8)
+ assert(!testDirectBuffer.hasArray())
+ Utils.writeByteBuffer(testDirectBuffer, bytesOut)
+ assert(testDirectBuffer.position() === 0)
+
+ Utils.writeByteBuffer(testDirectBuffer, dataOut: DataOutput)
+ assert(testDirectBuffer.position() === 0)
+ }
+
test("get iterator size") {
val empty = Seq[Int]()
assert(Utils.getIteratorSize(empty.toIterator) === 0L)