aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-17 20:00:56 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-17 20:00:56 -0700
commit6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109 (patch)
tree715e352a7e88818d315c92e8ae5c23e9a26b90ab /core/src/test
parent6037ed0a1d7ecbb77140ddf4d0192a1dc60316bb (diff)
downloadspark-6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109.tar.gz
spark-6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109.tar.bz2
spark-6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109.zip
[SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer. This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted. This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet). Author: Josh Rosen <joshrosen@databricks.com> Closes #11748 from JoshRosen/chunked-block-serialization.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala93
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala14
3 files changed, 111 insertions, 15 deletions
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
new file mode 100644
index 0000000000..aab70e7431
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferSuite extends SparkFunSuite {
+
+ test("no chunks") {
+ val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
+ assert(emptyChunkedByteBuffer.size === 0)
+ assert(emptyChunkedByteBuffer.getChunks().isEmpty)
+ assert(emptyChunkedByteBuffer.toArray === Array.empty)
+ assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0)
+ assert(emptyChunkedByteBuffer.toNetty.capacity() === 0)
+ emptyChunkedByteBuffer.toInputStream(dispose = false).close()
+ emptyChunkedByteBuffer.toInputStream(dispose = true).close()
+ }
+
+ test("chunks must be non-empty") {
+ intercept[IllegalArgumentException] {
+ new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
+ }
+ }
+
+ test("getChunks() duplicates chunks") {
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+ chunkedByteBuffer.getChunks().head.position(4)
+ assert(chunkedByteBuffer.getChunks().head.position() === 0)
+ }
+
+ test("copy() does not affect original buffer's position") {
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+ chunkedByteBuffer.copy()
+ assert(chunkedByteBuffer.getChunks().head.position() === 0)
+ }
+
+ test("writeFully() does not affect original buffer's position") {
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+ chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
+ assert(chunkedByteBuffer.getChunks().head.position() === 0)
+ }
+
+ test("toArray()") {
+ val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
+ assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
+ }
+
+ test("toArray() throws UnsupportedOperationException if size exceeds 2GB") {
+ val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4)
+ fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity())
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer))
+ assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L))
+ intercept[UnsupportedOperationException] {
+ chunkedByteBuffer.toArray
+ }
+ }
+
+ test("toInputStream()") {
+ val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
+ val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
+ assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
+
+ val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
+ val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt)
+ ByteStreams.readFully(inputStream, bytesFromStream)
+ assert(bytesFromStream === bytes1.array() ++ bytes2.array())
+ assert(chunkedByteBuffer.getChunks().head.position() === 0)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 2e0c0596a7..edf5cd35e4 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -44,6 +44,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util._
+import org.apache.spark.util.io.ChunkedByteBuffer
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with ResetSystemProperties {
@@ -192,8 +193,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
- store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
- store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
+ store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store")
assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -434,8 +435,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
t2.join()
t3.join()
- store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
- store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
+ store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
store.waitForAsyncReregister()
}
}
@@ -1253,9 +1254,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(12000)
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
- var bytes: ByteBuffer = null
+ var bytes: ChunkedByteBuffer = null
memoryStore.putBytes(blockId, 10000, () => {
- bytes = ByteBuffer.allocate(10000)
+ bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
bytes
})
assert(memoryStore.getSize(blockId) === 10000)
@@ -1364,7 +1365,7 @@ private object BlockManagerSuite {
def dropFromMemoryIfExists(
blockId: BlockId,
- data: () => Either[Array[Any], ByteBuffer]): Unit = {
+ data: () => Either[Array[Any], ChunkedByteBuffer]): Unit = {
store.blockInfoManager.lockForWriting(blockId).foreach { info =>
val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
if (newEffectiveStorageLevel.isValid) {
@@ -1394,7 +1395,9 @@ private object BlockManagerSuite {
val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
- val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
+ val getLocalBytesAndReleaseLock: (BlockId) => Option[ChunkedByteBuffer] = {
+ wrapGet(store.getLocalBytes)
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index 97e74fe706..9ed5016510 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -21,6 +21,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.Arrays
import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.util.io.ChunkedByteBuffer
class DiskStoreSuite extends SparkFunSuite {
@@ -29,7 +30,7 @@ class DiskStoreSuite extends SparkFunSuite {
// Create a non-trivial (not all zeros) byte array
val bytes = Array.tabulate[Byte](1000)(_.toByte)
- val byteBuffer = ByteBuffer.wrap(bytes)
+ val byteBuffer = new ChunkedByteBuffer(ByteBuffer.wrap(bytes))
val blockId = BlockId("rdd_1_2")
val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true)
@@ -44,9 +45,10 @@ class DiskStoreSuite extends SparkFunSuite {
val notMapped = diskStoreNotMapped.getBytes(blockId)
// Not possible to do isInstanceOf due to visibility of HeapByteBuffer
- assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+ assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")),
"Expected HeapByteBuffer for un-mapped read")
- assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+ assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]),
+ "Expected MappedByteBuffer for mapped read")
def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
val array = new Array[Byte](in.remaining())
@@ -54,9 +56,7 @@ class DiskStoreSuite extends SparkFunSuite {
array
}
- val mappedAsArray = arrayFromByteBuffer(mapped)
- val notMappedAsArray = arrayFromByteBuffer(notMapped)
- assert(Arrays.equals(mappedAsArray, bytes))
- assert(Arrays.equals(notMappedAsArray, bytes))
+ assert(Arrays.equals(mapped.toArray, bytes))
+ assert(Arrays.equals(notMapped.toArray, bytes))
}
}