aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-07-08 20:18:49 -0700
committerReynold Xin <rxin@databricks.com>2016-07-08 20:18:49 -0700
commitd8b06f18dc3e35938d15099beac98221d6f528b5 (patch)
treeb3d6c967dde440d354af0640f97a8006870682da /core
parent6cef0183c0f0392dad78fec54635afdb9341b7f3 (diff)
downloadspark-d8b06f18dc3e35938d15099beac98221d6f528b5.tar.gz
spark-d8b06f18dc3e35938d15099beac98221d6f528b5.tar.bz2
spark-d8b06f18dc3e35938d15099beac98221d6f528b5.zip
[SPARK-16432] Empty blocks fail to serialize due to assert in ChunkedByteBuffer
## What changes were proposed in this pull request? It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen ## How was this patch tested? Unit tests, also checked that the original reproduction case in https://github.com/apache/spark/pull/11748#issuecomment-230760283 is resolved. Author: Eric Liang <ekl@databricks.com> Closes #14099 from ericl/spark-16432.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala12
2 files changed, 8 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index fb4706e78d..89b0874e38 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -31,14 +31,13 @@ import org.apache.spark.storage.StorageUtils
* Read-only byte buffer which is physically stored as multiple chunks rather than a single
* contiguous array.
*
- * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have
- * position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer,
- * so if these buffers may also be used elsewhere then the caller is responsible for
- * copying them as needed.
+ * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must have position == 0.
+ * Ownership of these buffers is transferred to the ChunkedByteBuffer, so if these
+ * buffers may also be used elsewhere then the caller is responsible for copying
+ * them as needed.
*/
private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
require(chunks != null, "chunks must not be null")
- require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
private[this] var disposed: Boolean = false
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index f205d4f0d6..38b48a4c9e 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -38,12 +38,6 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
emptyChunkedByteBuffer.toInputStream(dispose = true).close()
}
- test("chunks must be non-empty") {
- intercept[IllegalArgumentException] {
- new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
- }
- }
-
test("getChunks() duplicates chunks") {
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
chunkedByteBuffer.getChunks().head.position(4)
@@ -63,8 +57,9 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
}
test("toArray()") {
+ val empty = ByteBuffer.wrap(Array[Byte]())
val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
- val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes, empty))
assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
}
@@ -79,9 +74,10 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
}
test("toInputStream()") {
+ val empty = ByteBuffer.wrap(Array[Byte]())
val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
- val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2))
assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
val inputStream = chunkedByteBuffer.toInputStream(dispose = false)