aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala34
1 files changed, 16 insertions, 18 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index c11de82667..9929ea033a 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -79,6 +79,13 @@ class MemoryStoreSuite
(memoryStore, blockInfoManager)
}
+ private def assertSameContents[T](expected: Seq[T], actual: Seq[T], hint: String): Unit = {
+ assert(actual.length === expected.length, s"wrong number of values returned in $hint")
+ expected.iterator.zip(actual.iterator).foreach { case (e, a) =>
+ assert(e === a, s"$hint did not return original values!")
+ }
+ }
+
test("reserve/release unroll memory") {
val (memoryStore, _) = makeMemoryStore(12000)
assert(memoryStore.currentUnrollMemory === 0)
@@ -137,9 +144,7 @@ class MemoryStoreSuite
var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
- assert(e === a, "getValues() did not return original values!")
- }
+ assertSameContents(smallList, memoryStore.getValues("unroll").get.toSeq, "getValues")
blockInfoManager.lockForWriting("unroll")
assert(memoryStore.remove("unroll"))
blockInfoManager.removeBlock("unroll")
@@ -152,9 +157,7 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
assert(!memoryStore.contains("someBlock1"))
- smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
- assert(e === a, "getValues() did not return original values!")
- }
+ assertSameContents(smallList, memoryStore.getValues("unroll").get.toSeq, "getValues")
blockInfoManager.lockForWriting("unroll")
assert(memoryStore.remove("unroll"))
blockInfoManager.removeBlock("unroll")
@@ -167,9 +170,7 @@ class MemoryStoreSuite
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
assert(putResult.isLeft)
- bigList.iterator.zip(putResult.left.get).foreach { case (e, a) =>
- assert(e === a, "putIterator() did not return original values!")
- }
+ assertSameContents(bigList, putResult.left.get.toSeq, "putIterator")
// The unroll memory was freed once the iterator returned by putIterator() was fully traversed.
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
@@ -316,9 +317,8 @@ class MemoryStoreSuite
assert(res.isLeft)
assert(memoryStore.currentUnrollMemoryForThisTask > 0)
val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization
- valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) =>
- assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!")
- }
+ assertSameContents(
+ bigList, valuesReturnedFromFailedPut, "PartiallySerializedBlock.valuesIterator()")
// The unroll memory was freed once the iterator was fully traversed.
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
@@ -340,12 +340,10 @@ class MemoryStoreSuite
res.left.get.finishWritingToStream(bos)
// The unroll memory was freed once the block was fully written.
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- val deserializationStream = serializerManager.dataDeserializeStream[Any](
- "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any)
- deserializationStream.zip(bigList.iterator).foreach { case (e, a) =>
- assert(e === a,
- "PartiallySerializedBlock.finishWritingtoStream() did not write original values!")
- }
+ val deserializedValues = serializerManager.dataDeserializeStream[Any](
+ "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any).toSeq
+ assertSameContents(
+ bigList, deserializedValues, "PartiallySerializedBlock.finishWritingToStream()")
}
test("multiple unrolls by the same thread") {