diff options
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 9 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala | 169 |
2 files changed, 144 insertions, 34 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 94f6f87740..7a4cb39b14 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1035,7 +1035,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("safely unroll blocks through putIterator (disk)") { store = makeBlockManager(12000) - val memAndDisk = StorageLevel.MEMORY_AND_DISK val memoryStore = store.memoryStore val diskStore = store.diskStore val smallList = List.fill(40)(new Array[Byte](100)) @@ -1044,12 +1043,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - store.putIterator("b1", smallIterator, memAndDisk) - store.putIterator("b2", smallIterator, memAndDisk) + store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK) + store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK) // Unroll with not enough space. This should succeed but kick out b1 in the process. // Memory store should contain b2 and b3, while disk store should contain only b1 - val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any) + val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -1065,7 +1064,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // the block may be stored to disk. During the unrolling process, block "b2" should be kicked // out, so the memory store should contain only b3, while the disk store should contain // b1, b2 and b4. - val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any) + val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any) assert(result4.isLeft) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index b4ab67ca15..43e832dc02 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -29,7 +29,7 @@ import org.scalatest._ import org.apache.spark._ import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.serializer.{KryoSerializer, SerializerManager} -import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator} +import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer @@ -47,6 +47,8 @@ class MemoryStoreSuite // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) + val serializerManager = new SerializerManager(serializer, conf) + // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) @@ -61,7 +63,6 @@ class MemoryStoreSuite def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = { val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) - val serializerManager = new SerializerManager(serializer, conf) val blockInfoManager = new BlockInfoManager val blockEvictionHandler = new BlockEvictionHandler { var memoryStore: MemoryStore = _ @@ -121,20 +122,20 @@ class MemoryStoreSuite val (memoryStore, blockInfoManager) = makeMemoryStore(12000) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - def putIterator[T]( + def putIteratorAsValues[T]( blockId: BlockId, iter: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { assert(blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false))) - val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag) + val res = memoryStore.putIteratorAsValues(blockId, iter, classTag) blockInfoManager.unlock(blockId) res } // Unroll with all the space in the world. This should succeed. - var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any) + var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => @@ -145,9 +146,9 @@ class MemoryStoreSuite blockInfoManager.removeBlock("unroll") // Unroll with not enough space. This should succeed after kicking out someBlock1. - assert(putIterator("someBlock1", smallList.iterator, ct).isRight) - assert(putIterator("someBlock2", smallList.iterator, ct).isRight) - putResult = putIterator("unroll", smallList.iterator, ClassTag.Any) + assert(putIteratorAsValues("someBlock1", smallList.iterator, ct).isRight) + assert(putIteratorAsValues("someBlock2", smallList.iterator, ct).isRight) + putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any) assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) @@ -162,8 +163,8 @@ class MemoryStoreSuite // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the meantime, however, we kicked out someBlock2 before giving up. - assert(putIterator("someBlock3", smallList.iterator, ct).isRight) - putResult = putIterator("unroll", bigList.iterator, ClassTag.Any) + assert(putIteratorAsValues("someBlock3", smallList.iterator, ct).isRight) + putResult = putIteratorAsValues("unroll", bigList.iterator, ClassTag.Any) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator assert(!memoryStore.contains("someBlock2")) assert(putResult.isLeft) @@ -174,7 +175,7 @@ class MemoryStoreSuite assert(memoryStore.currentUnrollMemoryForThisTask === 0) } - test("safely unroll blocks through putIterator") { + test("safely unroll blocks through putIteratorAsValues") { val (memoryStore, blockInfoManager) = makeMemoryStore(12000) val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) @@ -182,21 +183,21 @@ class MemoryStoreSuite def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - def putIterator[T]( + def putIteratorAsValues[T]( blockId: BlockId, iter: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { assert(blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false))) - val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag) + val res = memoryStore.putIteratorAsValues(blockId, iter, classTag) blockInfoManager.unlock(blockId) res } // Unroll with plenty of space. This should succeed and cache both blocks. - val result1 = putIterator("b1", smallIterator, ClassTag.Any) - val result2 = putIterator("b2", smallIterator, ClassTag.Any) + val result1 = putIteratorAsValues("b1", smallIterator, ClassTag.Any) + val result2 = putIteratorAsValues("b2", smallIterator, ClassTag.Any) assert(memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(result1.isRight) // unroll was successful @@ -211,11 +212,11 @@ class MemoryStoreSuite blockInfoManager.lockForWriting("b2") memoryStore.remove("b2") blockInfoManager.removeBlock("b2") - putIterator("b1", smallIterator, ClassTag.Any) - putIterator("b2", smallIterator, ClassTag.Any) + putIteratorAsValues("b1", smallIterator, ClassTag.Any) + putIteratorAsValues("b2", smallIterator, ClassTag.Any) // Unroll with not enough space. This should succeed but kick out b1 in the process. - val result3 = putIterator("b3", smallIterator, ClassTag.Any) + val result3 = putIteratorAsValues("b3", smallIterator, ClassTag.Any) assert(result3.isRight) assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) @@ -224,10 +225,10 @@ class MemoryStoreSuite blockInfoManager.lockForWriting("b3") assert(memoryStore.remove("b3")) blockInfoManager.removeBlock("b3") - putIterator("b3", smallIterator, ClassTag.Any) + putIteratorAsValues("b3", smallIterator, ClassTag.Any) // Unroll huge block with not enough space. This should fail and kick out b2 in the process. - val result4 = putIterator("b4", bigIterator, ClassTag.Any) + val result4 = putIteratorAsValues("b4", bigIterator, ClassTag.Any) assert(result4.isLeft) // unroll was unsuccessful assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) @@ -238,41 +239,151 @@ class MemoryStoreSuite assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory } + test("safely unroll blocks through putIteratorAsBytes") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val smallList = List.fill(40)(new Array[Byte](100)) + val bigList = List.fill(40)(new Array[Byte](1000)) + def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + def putIteratorAsBytes[T]( + blockId: BlockId, + iter: Iterator[T], + classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = { + assert(blockInfoManager.lockNewBlockForWriting( + blockId, + new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false))) + val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag) + blockInfoManager.unlock(blockId) + res + } + + // Unroll with plenty of space. This should succeed and cache both blocks. + val result1 = putIteratorAsBytes("b1", smallIterator, ClassTag.Any) + val result2 = putIteratorAsBytes("b2", smallIterator, ClassTag.Any) + assert(memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(result1.isRight) // unroll was successful + assert(result2.isRight) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + + // Re-put these two blocks so block manager knows about them too. Otherwise, block manager + // would not know how to drop them from memory later. + blockInfoManager.lockForWriting("b1") + memoryStore.remove("b1") + blockInfoManager.removeBlock("b1") + blockInfoManager.lockForWriting("b2") + memoryStore.remove("b2") + blockInfoManager.removeBlock("b2") + putIteratorAsBytes("b1", smallIterator, ClassTag.Any) + putIteratorAsBytes("b2", smallIterator, ClassTag.Any) + + // Unroll with not enough space. This should succeed but kick out b1 in the process. + val result3 = putIteratorAsBytes("b3", smallIterator, ClassTag.Any) + assert(result3.isRight) + assert(!memoryStore.contains("b1")) + assert(memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + blockInfoManager.lockForWriting("b3") + assert(memoryStore.remove("b3")) + blockInfoManager.removeBlock("b3") + putIteratorAsBytes("b3", smallIterator, ClassTag.Any) + + // Unroll huge block with not enough space. This should fail and kick out b2 in the process. + val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any) + assert(result4.isLeft) // unroll was unsuccessful + assert(!memoryStore.contains("b1")) + assert(!memoryStore.contains("b2")) + assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b4")) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator + result4.left.get.discard() + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // discard released the unroll memory + } + + test("PartiallySerializedBlock.valuesIterator") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val bigList = List.fill(40)(new Array[Byte](1000)) + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] + + // Unroll huge block with not enough space. This should fail. + assert(blockInfoManager.lockNewBlockForWriting( + "b1", + new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + blockInfoManager.unlock("b1") + assert(res.isLeft) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) + val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization + valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) => + assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!") + } + // The unroll memory was freed once the iterator was fully traversed. + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + } + + test("PartiallySerializedBlock.finishWritingToStream") { + val (memoryStore, blockInfoManager) = makeMemoryStore(12000) + val bigList = List.fill(40)(new Array[Byte](1000)) + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] + + // Unroll huge block with not enough space. This should fail. + assert(blockInfoManager.lockNewBlockForWriting( + "b1", + new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + blockInfoManager.unlock("b1") + assert(res.isLeft) + assert(memoryStore.currentUnrollMemoryForThisTask > 0) + val bos = new ByteBufferOutputStream() + res.left.get.finishWritingToStream(bos) + // The unroll memory was freed once the block was fully written. + assert(memoryStore.currentUnrollMemoryForThisTask === 0) + val deserializationStream = serializerManager.dataDeserializeStream[Any]( + "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any) + deserializationStream.zip(bigList.iterator).foreach { case (e, a) => + assert(e === a, + "PartiallySerializedBlock.finishWritingtoStream() did not write original values!") + } + } + test("multiple unrolls by the same thread") { val (memoryStore, _) = makeMemoryStore(12000) val smallList = List.fill(40)(new Array[Byte](100)) def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - def putIterator( + def putIteratorAsValues( blockId: BlockId, iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = { - memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any) + memoryStore.putIteratorAsValues(blockId, iter, ClassTag.Any) } // All unroll memory used is released because putIterator did not return an iterator - assert(putIterator("b1", smallIterator).isRight) + assert(putIteratorAsValues("b1", smallIterator).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(putIterator("b2", smallIterator).isRight) + assert(putIteratorAsValues("b2", smallIterator).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll memory is not released because putIterator returned an iterator // that still depends on the underlying vector used in the process - assert(putIterator("b3", smallIterator).isLeft) + assert(putIteratorAsValues("b3", smallIterator).isLeft) val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB3 > 0) // The unroll memory owned by this thread builds on top of its value after the previous unrolls - assert(putIterator("b4", smallIterator).isLeft) + assert(putIteratorAsValues("b4", smallIterator).isLeft) val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - assert(putIterator("b5", smallIterator).isLeft) + assert(putIteratorAsValues("b5", smallIterator).isLeft) val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - assert(putIterator("b6", smallIterator).isLeft) + assert(putIteratorAsValues("b6", smallIterator).isLeft) val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - assert(putIterator("b7", smallIterator).isLeft) + assert(putIteratorAsValues("b7", smallIterator).isLeft) val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) |