aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/storage/StorageSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala87
1 files changed, 82 insertions, 5 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index e5733aebf6..da198f946f 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -27,7 +27,7 @@ class StorageSuite extends SparkFunSuite {
// For testing add, update, and remove (for non-RDD blocks)
private def storageStatus1: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
assert(status.blocks.isEmpty)
assert(status.rddBlocks.isEmpty)
assert(status.memUsed === 0L)
@@ -74,7 +74,7 @@ class StorageSuite extends SparkFunSuite {
// For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
private def storageStatus2: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
assert(status.rddBlocks.isEmpty)
status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L))
@@ -252,9 +252,9 @@ class StorageSuite extends SparkFunSuite {
// For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
private def stockStorageStatuses: Seq[StorageStatus] = {
- val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
- val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
- val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
+ val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
+ val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L, Some(2000L), Some(0L))
+ val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L, Some(3000L), Some(0L))
status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L))
status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L))
@@ -332,4 +332,81 @@ class StorageSuite extends SparkFunSuite {
assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
}
+ private val offheap = StorageLevel.OFF_HEAP
+ // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD onheap
+ // and offheap blocks
+ private def storageStatus3: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, Some(1000L), Some(1000L))
+ assert(status.rddBlocks.isEmpty)
+ status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(TestBlockId("man"), BlockStatus(offheap, 10L, 0L))
+ status.addBlock(RDDBlockId(0, 0), BlockStatus(offheap, 10L, 0L))
+ status.addBlock(RDDBlockId(1, 1), BlockStatus(offheap, 100L, 0L))
+ status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L))
+ status
+ }
+
+ test("storage memUsed, diskUsed with on-heap and off-heap blocks") {
+ val status = storageStatus3
+ def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
+ def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
+
+ def actualOnHeapMemUsed: Long =
+ status.blocks.values.filter(!_.storageLevel.useOffHeap).map(_.memSize).sum
+ def actualOffHeapMemUsed: Long =
+ status.blocks.values.filter(_.storageLevel.useOffHeap).map(_.memSize).sum
+
+ assert(status.maxMem === status.maxOnHeapMem.get + status.maxOffHeapMem.get)
+
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+ assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+ assert(status.memRemaining === status.maxMem - actualMemUsed)
+ assert(status.onHeapMemRemaining.get === status.maxOnHeapMem.get - actualOnHeapMemUsed)
+ assert(status.offHeapMemRemaining.get === status.maxOffHeapMem.get - actualOffHeapMemUsed)
+
+ status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L))
+ status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(offheap, 4L, 0L))
+ status.updateBlock(RDDBlockId(1, 1), BlockStatus(offheap, 4L, 0L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+ assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+ status.removeBlock(TestBlockId("fire"))
+ status.removeBlock(TestBlockId("man"))
+ status.removeBlock(RDDBlockId(2, 2))
+ status.removeBlock(RDDBlockId(2, 3))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ }
+
+ private def storageStatus4: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, None, None)
+ status
+ }
+ test("old SparkListenerBlockManagerAdded event compatible") {
+ // This scenario will only be happened when replaying old event log. In this scenario there's
+ // no block add or remove event replayed, so only total amount of memory is valid.
+ val status = storageStatus4
+ assert(status.maxMem === status.maxMemory)
+
+ assert(status.memUsed === 0L)
+ assert(status.diskUsed === 0L)
+ assert(status.onHeapMemUsed === None)
+ assert(status.offHeapMemUsed === None)
+
+ assert(status.memRemaining === status.maxMem)
+ assert(status.onHeapMemRemaining === None)
+ assert(status.offHeapMemRemaining === None)
+ }
}