aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala55
1 files changed, 32 insertions, 23 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0f31561170..6e6cf6385f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -184,8 +184,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
- store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
- store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -425,8 +425,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
t2.join()
t3.join()
- store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer])
- store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer])
+ store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer])
store.waitForAsyncReregister()
}
}
@@ -847,23 +847,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](2000))
val bigList = List.fill(8)(new Array[Byte](2000))
+ def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = {
+ val context = TaskContext.empty()
+ try {
+ TaskContext.setTaskContext(context)
+ task
+ } finally {
+ TaskContext.unset()
+ }
+ context.taskMetrics.updatedBlocks.getOrElse(Seq.empty)
+ }
+
// 1 updated block (i.e. list1)
- val updatedBlocks1 =
+ val updatedBlocks1 = getUpdatedBlocks {
store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks1.size === 1)
assert(updatedBlocks1.head._1 === TestBlockId("list1"))
assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 1 updated block (i.e. list2)
- val updatedBlocks2 =
+ val updatedBlocks2 = getUpdatedBlocks {
store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ }
assert(updatedBlocks2.size === 1)
assert(updatedBlocks2.head._1 === TestBlockId("list2"))
assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 2 updated blocks - list1 is kicked out of memory while list3 is added
- val updatedBlocks3 =
+ val updatedBlocks3 = getUpdatedBlocks {
store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks3.size === 2)
updatedBlocks3.foreach { case (id, status) =>
id match {
@@ -875,8 +889,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
- val updatedBlocks4 =
+ val updatedBlocks4 = getUpdatedBlocks {
store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks4.size === 2)
updatedBlocks4.foreach { case (id, status) =>
id match {
@@ -889,8 +904,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
// No updated blocks - list5 is too big to fit in store and nothing is kicked out
- val updatedBlocks5 =
+ val updatedBlocks5 = getUpdatedBlocks {
store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
assert(updatedBlocks5.size === 0)
// memory store contains only list3 and list4
@@ -1005,8 +1021,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
- memoryStore.reserveUnrollMemoryForThisTask(
- TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)])
+ memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
}
// Reserve
@@ -1062,11 +1077,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
val memoryStore = store.memoryStore
- val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with all the space in the world. This should succeed and return an array.
- var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
memoryStore.releasePendingUnrollMemoryForThisTask()
@@ -1074,24 +1088,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed after kicking out someBlock1.
store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
- unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- assert(droppedBlocks.size === 1)
- assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
- droppedBlocks.clear()
+ assert(memoryStore.contains("someBlock2"))
+ assert(!memoryStore.contains("someBlock1"))
memoryStore.releasePendingUnrollMemoryForThisTask()
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
- unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
+ unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
- assert(droppedBlocks.size === 1)
- assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
- droppedBlocks.clear()
+ assert(!memoryStore.contains("someBlock2"))
}
test("safely unroll blocks through putIterator") {
@@ -1238,7 +1249,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
})
assert(result.size === 13000)
assert(result.data === null)
- assert(result.droppedBlocks === Nil)
}
test("put a small ByteBuffer to MemoryStore") {
@@ -1252,6 +1262,5 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
})
assert(result.size === 10000)
assert(result.data === Right(bytes))
- assert(result.droppedBlocks === Nil)
}
}