aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala169
2 files changed, 144 insertions, 34 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 94f6f87740..7a4cb39b14 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1035,7 +1035,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("safely unroll blocks through putIterator (disk)") {
store = makeBlockManager(12000)
- val memAndDisk = StorageLevel.MEMORY_AND_DISK
val memoryStore = store.memoryStore
val diskStore = store.diskStore
val smallList = List.fill(40)(new Array[Byte](100))
@@ -1044,12 +1043,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- store.putIterator("b1", smallIterator, memAndDisk)
- store.putIterator("b2", smallIterator, memAndDisk)
+ store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK)
+ store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any)
+ val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -1065,7 +1064,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// the block may be stored to disk. During the unrolling process, block "b2" should be kicked
// out, so the memory store should contain only b3, while the disk store should contain
// b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any)
+ val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index b4ab67ca15..43e832dc02 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest._
import org.apache.spark._
import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator}
+import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer
@@ -47,6 +47,8 @@ class MemoryStoreSuite
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+ val serializerManager = new SerializerManager(serializer, conf)
+
// Implicitly convert strings to BlockIds for test clarity.
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
@@ -61,7 +63,6 @@ class MemoryStoreSuite
def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
- val serializerManager = new SerializerManager(serializer, conf)
val blockInfoManager = new BlockInfoManager
val blockEvictionHandler = new BlockEvictionHandler {
var memoryStore: MemoryStore = _
@@ -121,20 +122,20 @@ class MemoryStoreSuite
val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator[T](
+ def putIteratorAsValues[T](
blockId: BlockId,
iter: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
assert(blockInfoManager.lockNewBlockForWriting(
blockId,
new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
- val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
blockInfoManager.unlock(blockId)
res
}
// Unroll with all the space in the world. This should succeed.
- var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
@@ -145,9 +146,9 @@ class MemoryStoreSuite
blockInfoManager.removeBlock("unroll")
// Unroll with not enough space. This should succeed after kicking out someBlock1.
- assert(putIterator("someBlock1", smallList.iterator, ct).isRight)
- assert(putIterator("someBlock2", smallList.iterator, ct).isRight)
- putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ assert(putIteratorAsValues("someBlock1", smallList.iterator, ct).isRight)
+ assert(putIteratorAsValues("someBlock2", smallList.iterator, ct).isRight)
+ putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
@@ -162,8 +163,8 @@ class MemoryStoreSuite
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the meantime, however, we kicked out someBlock2 before giving up.
- assert(putIterator("someBlock3", smallList.iterator, ct).isRight)
- putResult = putIterator("unroll", bigList.iterator, ClassTag.Any)
+ assert(putIteratorAsValues("someBlock3", smallList.iterator, ct).isRight)
+ putResult = putIteratorAsValues("unroll", bigList.iterator, ClassTag.Any)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
assert(putResult.isLeft)
@@ -174,7 +175,7 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
- test("safely unroll blocks through putIterator") {
+ test("safely unroll blocks through putIteratorAsValues") {
val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
@@ -182,21 +183,21 @@ class MemoryStoreSuite
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator[T](
+ def putIteratorAsValues[T](
blockId: BlockId,
iter: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
assert(blockInfoManager.lockNewBlockForWriting(
blockId,
new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
- val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+ val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
blockInfoManager.unlock(blockId)
res
}
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = putIterator("b1", smallIterator, ClassTag.Any)
- val result2 = putIterator("b2", smallIterator, ClassTag.Any)
+ val result1 = putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+ val result2 = putIteratorAsValues("b2", smallIterator, ClassTag.Any)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(result1.isRight) // unroll was successful
@@ -211,11 +212,11 @@ class MemoryStoreSuite
blockInfoManager.lockForWriting("b2")
memoryStore.remove("b2")
blockInfoManager.removeBlock("b2")
- putIterator("b1", smallIterator, ClassTag.Any)
- putIterator("b2", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b2", smallIterator, ClassTag.Any)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = putIterator("b3", smallIterator, ClassTag.Any)
+ val result3 = putIteratorAsValues("b3", smallIterator, ClassTag.Any)
assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
@@ -224,10 +225,10 @@ class MemoryStoreSuite
blockInfoManager.lockForWriting("b3")
assert(memoryStore.remove("b3"))
blockInfoManager.removeBlock("b3")
- putIterator("b3", smallIterator, ClassTag.Any)
+ putIteratorAsValues("b3", smallIterator, ClassTag.Any)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = putIterator("b4", bigIterator, ClassTag.Any)
+ val result4 = putIteratorAsValues("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
@@ -238,41 +239,151 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
}
+ test("safely unroll blocks through putIteratorAsBytes") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIteratorAsBytes[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+ assert(blockInfoManager.lockNewBlockForWriting(
+ blockId,
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag)
+ blockInfoManager.unlock(blockId)
+ res
+ }
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ val result1 = putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+ val result2 = putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+ assert(memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(result1.isRight) // unroll was successful
+ assert(result2.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+ // would not know how to drop them from memory later.
+ blockInfoManager.lockForWriting("b1")
+ memoryStore.remove("b1")
+ blockInfoManager.removeBlock("b1")
+ blockInfoManager.lockForWriting("b2")
+ memoryStore.remove("b2")
+ blockInfoManager.removeBlock("b2")
+ putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+ putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ val result3 = putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+ assert(result3.isRight)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ blockInfoManager.lockForWriting("b3")
+ assert(memoryStore.remove("b3"))
+ blockInfoManager.removeBlock("b3")
+ putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+
+ // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+ val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
+ assert(result4.isLeft) // unroll was unsuccessful
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+ result4.left.get.discard()
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0) // discard released the unroll memory
+ }
+
+ test("PartiallySerializedBlock.valuesIterator") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+ // Unroll huge block with not enough space. This should fail.
+ assert(blockInfoManager.lockNewBlockForWriting(
+ "b1",
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+ blockInfoManager.unlock("b1")
+ assert(res.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+ val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization
+ valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) =>
+ assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!")
+ }
+ // The unroll memory was freed once the iterator was fully traversed.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ }
+
+ test("PartiallySerializedBlock.finishWritingToStream") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+ // Unroll huge block with not enough space. This should fail.
+ assert(blockInfoManager.lockNewBlockForWriting(
+ "b1",
+ new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+ val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+ blockInfoManager.unlock("b1")
+ assert(res.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+ val bos = new ByteBufferOutputStream()
+ res.left.get.finishWritingToStream(bos)
+ // The unroll memory was freed once the block was fully written.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ val deserializationStream = serializerManager.dataDeserializeStream[Any](
+ "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any)
+ deserializationStream.zip(bigList.iterator).foreach { case (e, a) =>
+ assert(e === a,
+ "PartiallySerializedBlock.finishWritingtoStream() did not write original values!")
+ }
+ }
+
test("multiple unrolls by the same thread") {
val (memoryStore, _) = makeMemoryStore(12000)
val smallList = List.fill(40)(new Array[Byte](100))
def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- def putIterator(
+ def putIteratorAsValues(
blockId: BlockId,
iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = {
- memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any)
+ memoryStore.putIteratorAsValues(blockId, iter, ClassTag.Any)
}
// All unroll memory used is released because putIterator did not return an iterator
- assert(putIterator("b1", smallIterator).isRight)
+ assert(putIteratorAsValues("b1", smallIterator).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(putIterator("b2", smallIterator).isRight)
+ assert(putIteratorAsValues("b2", smallIterator).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because putIterator returned an iterator
// that still depends on the underlying vector used in the process
- assert(putIterator("b3", smallIterator).isLeft)
+ assert(putIteratorAsValues("b3", smallIterator).isLeft)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- assert(putIterator("b4", smallIterator).isLeft)
+ assert(putIteratorAsValues("b4", smallIterator).isLeft)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- assert(putIterator("b5", smallIterator).isLeft)
+ assert(putIteratorAsValues("b5", smallIterator).isLeft)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- assert(putIterator("b6", smallIterator).isLeft)
+ assert(putIteratorAsValues("b6", smallIterator).isLeft)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- assert(putIterator("b7", smallIterator).isLeft)
+ assert(putIteratorAsValues("b7", smallIterator).isLeft)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)