aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala87
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala36
3 files changed, 119 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index dcf83cb530..764156c3ed 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -153,7 +153,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
"executor node blacklisting" -> "applications/app-20161116163331-0000/executors",
- "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors"
+ "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors",
+ "executor memory usage" -> "applications/app-20161116163331-0000/executors"
// Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845
// "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
)
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index e5733aebf6..da198f946f 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -27,7 +27,7 @@ class StorageSuite extends SparkFunSuite {
// For testing add, update, and remove (for non-RDD blocks)
private def storageStatus1: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
assert(status.blocks.isEmpty)
assert(status.rddBlocks.isEmpty)
assert(status.memUsed === 0L)
@@ -74,7 +74,7 @@ class StorageSuite extends SparkFunSuite {
// For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
private def storageStatus2: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
assert(status.rddBlocks.isEmpty)
status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L))
@@ -252,9 +252,9 @@ class StorageSuite extends SparkFunSuite {
// For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
private def stockStorageStatuses: Seq[StorageStatus] = {
- val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
- val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
- val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
+ val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
+ val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L, Some(2000L), Some(0L))
+ val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L, Some(3000L), Some(0L))
status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L))
status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L))
@@ -332,4 +332,81 @@ class StorageSuite extends SparkFunSuite {
assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
}
+ private val offheap = StorageLevel.OFF_HEAP
+ // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD onheap
+ // and offheap blocks
+ private def storageStatus3: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, Some(1000L), Some(1000L))
+ assert(status.rddBlocks.isEmpty)
+ status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(TestBlockId("man"), BlockStatus(offheap, 10L, 0L))
+ status.addBlock(RDDBlockId(0, 0), BlockStatus(offheap, 10L, 0L))
+ status.addBlock(RDDBlockId(1, 1), BlockStatus(offheap, 100L, 0L))
+ status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L))
+ status
+ }
+
+ test("storage memUsed, diskUsed with on-heap and off-heap blocks") {
+ val status = storageStatus3
+ def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
+ def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
+
+ def actualOnHeapMemUsed: Long =
+ status.blocks.values.filter(!_.storageLevel.useOffHeap).map(_.memSize).sum
+ def actualOffHeapMemUsed: Long =
+ status.blocks.values.filter(_.storageLevel.useOffHeap).map(_.memSize).sum
+
+ assert(status.maxMem === status.maxOnHeapMem.get + status.maxOffHeapMem.get)
+
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+ assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+ assert(status.memRemaining === status.maxMem - actualMemUsed)
+ assert(status.onHeapMemRemaining.get === status.maxOnHeapMem.get - actualOnHeapMemUsed)
+ assert(status.offHeapMemRemaining.get === status.maxOffHeapMem.get - actualOffHeapMemUsed)
+
+ status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L))
+ status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(offheap, 4L, 0L))
+ status.updateBlock(RDDBlockId(1, 1), BlockStatus(offheap, 4L, 0L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+ assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+ status.removeBlock(TestBlockId("fire"))
+ status.removeBlock(TestBlockId("man"))
+ status.removeBlock(RDDBlockId(2, 2))
+ status.removeBlock(RDDBlockId(2, 3))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ }
+
+ private def storageStatus4: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, None, None)
+ status
+ }
+ test("old SparkListenerBlockManagerAdded event compatible") {
+ // This scenario will only be happened when replaying old event log. In this scenario there's
+ // no block add or remove event replayed, so only total amount of memory is valid.
+ val status = storageStatus4
+ assert(status.maxMem === status.maxMemory)
+
+ assert(status.memUsed === 0L)
+ assert(status.diskUsed === 0L)
+ assert(status.onHeapMemUsed === None)
+ assert(status.offHeapMemUsed === None)
+
+ assert(status.memRemaining === status.maxMem)
+ assert(status.onHeapMemRemaining === None)
+ assert(status.offHeapMemRemaining === None)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 4228373036..f4c561c737 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus}
+import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
@@ -103,6 +103,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "0")
.set("spark.ui.killEnabled", killEnabled.toString)
+ .set("spark.memory.offHeap.size", "64m")
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
@@ -151,6 +152,39 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val updatedRddJson = getJson(ui, "storage/rdd/0")
(updatedRddJson \ "storageLevel").extract[String] should be (
StorageLevels.MEMORY_ONLY.description)
+
+ val dataDistributions0 =
+ (updatedRddJson \ "dataDistribution").extract[Seq[RDDDataDistribution]]
+ dataDistributions0.length should be (1)
+ val dist0 = dataDistributions0.head
+
+ dist0.onHeapMemoryUsed should not be (None)
+ dist0.memoryUsed should be (dist0.onHeapMemoryUsed.get)
+ dist0.onHeapMemoryRemaining should not be (None)
+ dist0.offHeapMemoryRemaining should not be (None)
+ dist0.memoryRemaining should be (
+ dist0.onHeapMemoryRemaining.get + dist0.offHeapMemoryRemaining.get)
+ dist0.onHeapMemoryUsed should not be (Some(0L))
+ dist0.offHeapMemoryUsed should be (Some(0L))
+
+ rdd.unpersist()
+ rdd.persist(StorageLevels.OFF_HEAP).count()
+ val updatedStorageJson1 = getJson(ui, "storage/rdd")
+ updatedStorageJson1.children.length should be (1)
+ val updatedRddJson1 = getJson(ui, "storage/rdd/0")
+ val dataDistributions1 =
+ (updatedRddJson1 \ "dataDistribution").extract[Seq[RDDDataDistribution]]
+ dataDistributions1.length should be (1)
+ val dist1 = dataDistributions1.head
+
+ dist1.offHeapMemoryUsed should not be (None)
+ dist1.memoryUsed should be (dist1.offHeapMemoryUsed.get)
+ dist1.onHeapMemoryRemaining should not be (None)
+ dist1.offHeapMemoryRemaining should not be (None)
+ dist1.memoryRemaining should be (
+ dist1.onHeapMemoryRemaining.get + dist1.offHeapMemoryRemaining.get)
+ dist1.onHeapMemoryUsed should be (Some(0L))
+ dist1.offHeapMemoryUsed should not be (Some(0L))
}
}