aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-24 17:33:21 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-24 17:33:21 -0700
commitfdd460f5f47e4023d81d5a3d918bd4a16ecbb580 (patch)
tree0b54ced5251783827a533860160b28e1c12fa251 /core/src/test/scala/org/apache
parent2cf46d5a96897d5f97b364db357d30566183c6e7 (diff)
downloadspark-fdd460f5f47e4023d81d5a3d918bd4a16ecbb580.tar.gz
spark-fdd460f5f47e4023d81d5a3d918bd4a16ecbb580.tar.bz2
spark-fdd460f5f47e4023d81d5a3d918bd4a16ecbb580.zip
[SPARK-13980] Incrementally serialize blocks while unrolling them in MemoryStore
When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements. Instead, I think that we should incrementally serialize blocks while unrolling them. A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case. Author: Josh Rosen <joshrosen@databricks.com> Closes #11791 from JoshRosen/serialize-incrementally.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala169
2 files changed, 144 insertions, 34 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 94f6f87740..7a4cb39b14 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1035,7 +1035,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("safely unroll blocks through putIterator (disk)") {
store = makeBlockManager(12000)
- val memAndDisk = StorageLevel.MEMORY_AND_DISK
val memoryStore = store.memoryStore
val diskStore = store.diskStore
val smallList = List.fill(40)(new Array[Byte](100))
@@ -1044,12 +1043,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- store.putIterator("b1", smallIterator, memAndDisk)
- store.putIterator("b2", smallIterator, memAndDisk)
+ store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK)
+ store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any)
+ val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -1065,7 +1064,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// the block may be stored to disk. During the unrolling process, block "b2" should be kicked
// out, so the memory store should contain only b3, while the disk store should contain
// b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any)
+ val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index b4ab67ca15..43e832dc02 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest._
import org.apache.spark._
import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator}
+import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -47,6 +47,8 @@ class MemoryStoreSuite
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+ val serializerManager = new SerializerManager(serializer, conf)
+
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
@@ -61,7 +63,6 @@ class MemoryStoreSuite
def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val serializerManager = new SerializerManager(serializer, conf)
val blockInfoManager = new BlockInfoManager
val blockEvictionHandler = new BlockEvictionHandler {
var memoryStore: MemoryStore = _
@@ -121,20 +122,20 @@ class MemoryStoreSuite
val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator[T](
+ def putIteratorAsValues[T](
blockId: BlockId,
iter: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
assert(blockInfoManager.lockNewBlockForWriting(
blockId,
new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
- val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
blockInfoManager.unlock(blockId)
res
}
// Unroll with all the space in the world. This should succeed.
- var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
@@ -145,9 +146,9 @@ class MemoryStoreSuite
blockInfoManager.removeBlock("unroll")
// Unroll with not enough space. This should succeed after kicking out someBlock1.
- assert(putIterator("someBlock1", smallList.iterator, ct).isRight)
- assert(putIterator("someBlock2", smallList.iterator, ct).isRight)
- putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ assert(putIteratorAsValues("someBlock1", smallList.iterator, ct).isRight)
+ assert(putIteratorAsValues("someBlock2", smallList.iterator, ct).isRight)
+ putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
@@ -162,8 +163,8 @@ class MemoryStoreSuite
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the meantime, however, we kicked out someBlock2 before giving up.
- assert(putIterator("someBlock3", smallList.iterator, ct).isRight)
- putResult = putIterator("unroll", bigList.iterator, ClassTag.Any)
+ assert(putIteratorAsValues("someBlock3", smallList.iterator, ct).isRight)
+ putResult = putIteratorAsValues("unroll", bigList.iterator, ClassTag.Any)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
assert(putResult.isLeft)
@@ -174,7 +175,7 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
- test("safely unroll blocks through putIterator") {
+ test("safely unroll blocks through putIteratorAsValues") {
val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
@@ -182,21 +183,21 @@ class MemoryStoreSuite
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator[T](
+ def putIteratorAsValues[T](
blockId: BlockId,
iter: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
assert(blockInfoManager.lockNewBlockForWriting(
blockId,
new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
- val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
blockInfoManager.unlock(blockId)
res
}
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = putIterator("b1", smallIterator, ClassTag.Any)
- val result2 = putIterator("b2", smallIterator, ClassTag.Any)
+ val result1 = putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+ val result2 = putIteratorAsValues("b2", smallIterator, ClassTag.Any)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(result1.isRight) // unroll was successful
@@ -211,11 +212,11 @@ class MemoryStoreSuite
blockInfoManager.lockForWriting("b2")
memoryStore.remove("b2")
blockInfoManager.removeBlock("b2")
- putIterator("b1", smallIterator, ClassTag.Any)
- putIterator("b2", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b2", smallIterator, ClassTag.Any)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = putIterator("b3", smallIterator, ClassTag.Any)
+ val result3 = putIteratorAsValues("b3", smallIterator, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -224,10 +225,10 @@ class MemoryStoreSuite
blockInfoManager.lockForWriting("b3")
assert(memoryStore.remove("b3"))
blockInfoManager.removeBlock("b3")
- putIterator("b3", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b3", smallIterator, ClassTag.Any)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = putIterator("b4", bigIterator, ClassTag.Any)
+ val result4 = putIteratorAsValues("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
@@ -238,41 +239,151 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
}
+ test("safely unroll blocks through putIteratorAsBytes") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIteratorAsBytes[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+ assert(blockInfoManager.lockNewBlockForWriting(
+ blockId,
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag)
+ blockInfoManager.unlock(blockId)
+ res
+ }
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ val result1 = putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+ val result2 = putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+ assert(memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(result1.isRight) // unroll was successful
+ assert(result2.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+ // would not know how to drop them from memory later.
+ blockInfoManager.lockForWriting("b1")
+ memoryStore.remove("b1")
+ blockInfoManager.removeBlock("b1")
+ blockInfoManager.lockForWriting("b2")
+ memoryStore.remove("b2")
+ blockInfoManager.removeBlock("b2")
+ putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+ putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ val result3 = putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+ assert(result3.isRight)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ blockInfoManager.lockForWriting("b3")
+ assert(memoryStore.remove("b3"))
+ blockInfoManager.removeBlock("b3")
+ putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+
+ // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+ val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
+ assert(result4.isLeft) // unroll was unsuccessful
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+ result4.left.get.discard()
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0) // discard released the unroll memory
+ }
+
+ test("PartiallySerializedBlock.valuesIterator") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+ // Unroll huge block with not enough space. This should fail.
+ assert(blockInfoManager.lockNewBlockForWriting(
+ "b1",
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+ blockInfoManager.unlock("b1")
+ assert(res.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+ val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization
+ valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) =>
+ assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!")
+ }
+ // The unroll memory was freed once the iterator was fully traversed.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ }
+
+ test("PartiallySerializedBlock.finishWritingToStream") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+ // Unroll huge block with not enough space. This should fail.
+ assert(blockInfoManager.lockNewBlockForWriting(
+ "b1",
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+ blockInfoManager.unlock("b1")
+ assert(res.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+ val bos = new ByteBufferOutputStream()
+ res.left.get.finishWritingToStream(bos)
+ // The unroll memory was freed once the block was fully written.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ val deserializationStream = serializerManager.dataDeserializeStream[Any](
+ "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any)
+ deserializationStream.zip(bigList.iterator).foreach { case (e, a) =>
+ assert(e === a,
+ "PartiallySerializedBlock.finishWritingtoStream() did not write original values!")
+ }
+ }
+
test("multiple unrolls by the same thread") {
val (memoryStore, _) = makeMemoryStore(12000)
val smallList = List.fill(40)(new Array[Byte](100))
def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator(
+ def putIteratorAsValues(
blockId: BlockId,
iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = {
- memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any)
+ memoryStore.putIteratorAsValues(blockId, iter, ClassTag.Any)
}
// All unroll memory used is released because putIterator did not return an iterator
- assert(putIterator("b1", smallIterator).isRight)
+ assert(putIteratorAsValues("b1", smallIterator).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(putIterator("b2", smallIterator).isRight)
+ assert(putIteratorAsValues("b2", smallIterator).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because putIterator returned an iterator
// that still depends on the underlying vector used in the process
- assert(putIterator("b3", smallIterator).isLeft)
+ assert(putIteratorAsValues("b3", smallIterator).isLeft)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- assert(putIterator("b4", smallIterator).isLeft)
+ assert(putIteratorAsValues("b4", smallIterator).isLeft)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- assert(putIterator("b5", smallIterator).isLeft)
+ assert(putIteratorAsValues("b5", smallIterator).isLeft)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- assert(putIterator("b6", smallIterator).isLeft)
+ assert(putIteratorAsValues("b6", smallIterator).isLeft)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- assert(putIterator("b7", smallIterator).isLeft)
+ assert(putIteratorAsValues("b7", smallIterator).isLeft)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)