From fdd460f5f47e4023d81d5a3d918bd4a16ecbb580 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 24 Mar 2016 17:33:21 -0700 Subject: [SPARK-13980] Incrementally serialize blocks while unrolling them in MemoryStore When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements. Instead, I think that we should incrementally serialize blocks while unrolling them. A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case. Author: Josh Rosen Closes #11791 from JoshRosen/serialize-incrementally. --- .../apache/spark/storage/BlockManagerSuite.scala | 9 +- .../apache/spark/storage/MemoryStoreSuite.scala | 169 +++++++++++++++++---- 2 files changed, 144 insertions(+), 34 deletions(-) (limited to 'core/src/test/scala/org/apache') diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 94f6f87740..7a4cb39b14 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1035,7 +1035,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("safely unroll blocks through putIterator (disk)") { store = makeBlockManager(12000) - val memAndDisk = StorageLevel.MEMORY_AND_DISK val memoryStore = store.memoryStore val diskStore = store.diskStore val smallList = List.fill(40)(new Array[Byte](100)) @@ -1044,12 +1043,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - store.putIterator("b1", smallIterator, memAndDisk) - store.putIterator("b2", smallIterator, memAndDisk) + store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK) + store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK) // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 - val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any) + val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -1065,7 +1064,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // the block may be stored to disk. During the unrolling process, block "b2" should be kicked // out, so the memory store should contain only b3, while the disk store should contain // b1, b2 and b4. - val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any) + val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any) assert(result4.isLeft) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index b4ab67ca15..43e832dc02 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -29,7 +29,7 @@ import org.scalatest._ import org.apache.spark._ import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.serializer.{KryoSerializer, SerializerManager} -import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator} +import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer @@ -47,6 +47,8 @@ class MemoryStoreSuite // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) + val serializerManager = new SerializerManager(serializer, conf) + // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) @@ -61,7 +63,6 @@ class MemoryStoreSuite def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = { val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) - val serializerManager = new SerializerManager(serializer, conf) val blockInfoManager = new BlockInfoManager val blockEvictionHandler = new BlockEvictionHandler { var memoryStore: MemoryStore = _ @@ -121,20 +122,20 @@ class MemoryStoreSuite val (memoryStore, blockInfoManager) = makeMemoryStore(12000) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - def putIterator[T]( + def putIteratorAsValues[T]( blockId: BlockId, iter: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { assert(blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false))) - val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag) + val res = memoryStore.putIteratorAsValues(blockId, iter, classTag) blockInfoManager.unlock(blockId) res } // Unroll with all the space in the world. This should succeed. - var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any) + var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => @@ -145,9 +146,9 @@ class MemoryStoreSuite blockInfoManager.removeBlock("unroll") // Unroll with not enough space. This should succeed after kicking out someBlock1. - assert(putIterator("someBlock1", smallList.iterator, ct).isRight) - assert(putIterator("someBlock2", smallList.iterator, ct).isRight) - putResult = putIterator("unroll", smallList.iterator, ClassTag.Any) + assert(putIteratorAsValues("someBlock1", smallList.iterator, ct).isRight) + assert(putIteratorAsValues("someBlock2", smallList.iterator, ct).isRight) + putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) @@ -162,8 +163,8 @@ class MemoryStoreSuite // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the meantime, however, we kicked out someBlock2 before giving up. - assert(putIterator("someBlock3", smallList.iterator, ct).isRight) - putResult = putIterator("unroll", bigList.iterator, ClassTag.Any) + assert(putIteratorAsValues("someBlock3", smallList.iterator, ct).isRight) + putResult = putIteratorAsValues("unroll", bigList.iterator, ClassTag.Any) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator assert(!memoryStore.contains("someBlock2")) assert(putResult.isLeft) @@ -174,7 +175,7 @@ class MemoryStoreSuite assert(memoryStore.currentUnrollMemoryForThisTask === 0) } - test("safely unroll blocks through putIterator") { + test("safely unroll blocks through putIteratorAsValues") { val (memoryStore, blockInfoManager) = makeMemoryStore(12000) val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) @@ -182,21 +183,21 @@ class MemoryStoreSuite def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - def putIterator[T]( + def putIteratorAsValues[T]( blockId: BlockId, iter: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { assert(blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false))) - val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag) + val res = memoryStore.putIteratorAsValues(blockId, iter, classTag) blockInfoManager.unlock(blockId) res } // Unroll with plenty of space. This should succeed and cache both blocks. - val result1 = putIterator("b1", smallIterator, ClassTag.Any) - val result2 = putIterator("b2", smallIterator, ClassTag.Any) + val result1 = putIteratorAsValues("b1", smallIterator, ClassTag.Any) + val result2 = putIteratorAsValues("b2", smallIterator, ClassTag.Any) assert(memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(result1.isRight) // unroll was successful @@ -211,11 +212,11 @@ class MemoryStoreSuite blockInfoManager.lockForWriting("b2") memoryStore.remove("b2") blockInfoManager.removeBlock("b2") - putIterator("b1", smallIterator, ClassTag.Any) - putIterator("b2", smallIterator, ClassTag.Any) + putIteratorAsValues("b1", smallIterator, ClassTag.Any) + putIteratorAsValues("b2", smallIterator, ClassTag.Any) // Unroll with not enough space. This should succeed but kick out b1 in the process. - val result3 = putIterator("b3", smallIterator, ClassTag.Any) + val result3 = putIteratorAsValues("b3", smallIterator, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -224,10 +225,10 @@ class MemoryStoreSuite blockInfoManager.lockForWriting("b3") assert(memoryStore.remove("b3")) blockInfoManager.removeBlock("b3") - putIterator("b3", smallIterator, ClassTag.Any) + putIteratorAsValues("b3", smallIterator, ClassTag.Any) // Unroll huge block with not enough space. This should fail and kick out b2 in the process. - val result4 = putIterator("b4", bigIterator, ClassTag.Any) + val result4 = putIteratorAsValues("b4", bigIterator, ClassTag.Any) assert(result4.isLeft) // unroll was unsuccessful assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) @@ -238,41 +239,151 @@ class MemoryStoreSuite assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory } + test("safely unroll blocks through putIteratorAsBytes") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val smallList = List.fill(40)(new Array[Byte](100)) + val bigList = List.fill(40)(new Array[Byte](1000)) + def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + def putIteratorAsBytes[T]( + blockId: BlockId, + iter: Iterator[T], + classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = { + assert(blockInfoManager.lockNewBlockForWriting( + blockId, + new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false))) + val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag) + blockInfoManager.unlock(blockId) + res + } + + // Unroll with plenty of space. This should succeed and cache both blocks. + val result1 = putIteratorAsBytes("b1", smallIterator, ClassTag.Any) + val result2 = putIteratorAsBytes("b2", smallIterator, ClassTag.Any) + assert(memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(result1.isRight) // unroll was successful + assert(result2.isRight) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + // Re-put these two blocks so block manager knows about them too. Otherwise, block manager + // would not know how to drop them from memory later. + blockInfoManager.lockForWriting("b1") + memoryStore.remove("b1") + blockInfoManager.removeBlock("b1") + blockInfoManager.lockForWriting("b2") + memoryStore.remove("b2") + blockInfoManager.removeBlock("b2") + putIteratorAsBytes("b1", smallIterator, ClassTag.Any) + putIteratorAsBytes("b2", smallIterator, ClassTag.Any) + + // Unroll with not enough space. This should succeed but kick out b1 in the process. + val result3 = putIteratorAsBytes("b3", smallIterator, ClassTag.Any) + assert(result3.isRight) + assert(!memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + blockInfoManager.lockForWriting("b3") + assert(memoryStore.remove("b3")) + blockInfoManager.removeBlock("b3") + putIteratorAsBytes("b3", smallIterator, ClassTag.Any) + + // Unroll huge block with not enough space. This should fail and kick out b2 in the process. + val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any) + assert(result4.isLeft) // unroll was unsuccessful + assert(!memoryStore.contains("b1")) + assert(!memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b4")) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator + result4.left.get.discard() + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // discard released the unroll memory + } + + test("PartiallySerializedBlock.valuesIterator") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val bigList = List.fill(40)(new Array[Byte](1000)) + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] + + // Unroll huge block with not enough space. This should fail. + assert(blockInfoManager.lockNewBlockForWriting( + "b1", + new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + blockInfoManager.unlock("b1") + assert(res.isLeft) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) + val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization + valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) => + assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!") + } + // The unroll memory was freed once the iterator was fully traversed. + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + } + + test("PartiallySerializedBlock.finishWritingToStream") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val bigList = List.fill(40)(new Array[Byte](1000)) + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] + + // Unroll huge block with not enough space. This should fail. + assert(blockInfoManager.lockNewBlockForWriting( + "b1", + new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + blockInfoManager.unlock("b1") + assert(res.isLeft) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) + val bos = new ByteBufferOutputStream() + res.left.get.finishWritingToStream(bos) + // The unroll memory was freed once the block was fully written. + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + val deserializationStream = serializerManager.dataDeserializeStream[Any]( + "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any) + deserializationStream.zip(bigList.iterator).foreach { case (e, a) => + assert(e === a, + "PartiallySerializedBlock.finishWritingtoStream() did not write original values!") + } + } + test("multiple unrolls by the same thread") { val (memoryStore, _) = makeMemoryStore(12000) val smallList = List.fill(40)(new Array[Byte](100)) def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - def putIterator( + def putIteratorAsValues( blockId: BlockId, iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = { - memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any) + memoryStore.putIteratorAsValues(blockId, iter, ClassTag.Any) } // All unroll memory used is released because putIterator did not return an iterator - assert(putIterator("b1", smallIterator).isRight) + assert(putIteratorAsValues("b1", smallIterator).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(putIterator("b2", smallIterator).isRight) + assert(putIteratorAsValues("b2", smallIterator).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll memory is not released because putIterator returned an iterator // that still depends on the underlying vector used in the process - assert(putIterator("b3", smallIterator).isLeft) + assert(putIteratorAsValues("b3", smallIterator).isLeft) val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB3 > 0) // The unroll memory owned by this thread builds on top of its value after the previous unrolls - assert(putIterator("b4", smallIterator).isLeft) + assert(putIteratorAsValues("b4", smallIterator).isLeft) val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - assert(putIterator("b5", smallIterator).isLeft) + assert(putIteratorAsValues("b5", smallIterator).isLeft) val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - assert(putIterator("b6", smallIterator).isLeft) + assert(putIteratorAsValues("b6", smallIterator).isLeft) val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - assert(putIterator("b7", smallIterator).isLeft) + assert(putIteratorAsValues("b7", smallIterator).isLeft) val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) -- cgit v1.2.3