diff options
author | jerryshao <sshao@hortonworks.com> | 2017-04-06 13:23:54 -0500 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2017-04-06 13:23:54 -0500 |
commit | a4491626ed8169f0162a0dfb78736c9b9e7fb434 (patch) | |
tree | c18ac0eeba2cb1e95ef6a6a2dee2a9c16ec31174 /core/src/test/scala | |
parent | 5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9 (diff) | |
download | spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.tar.gz spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.tar.bz2 spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.zip |
[SPARK-17019][CORE] Expose on-heap and off-heap memory usage in various places
## What changes were proposed in this pull request?
With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992), Spark supports persisting data into off-heap memory, but the usage of on-heap and off-heap memory is not exposed currently, it is not so convenient for user to monitor and profile, so here propose to expose off-heap memory as well as on-heap memory usage in various places:
1. Spark UI's executor page will display both on-heap and off-heap memory usage.
2. REST request returns both on-heap and off-heap memory.
3. Also this can be gotten from MetricsSystem.
4. Last this usage can be obtained programmatically from SparkListener.
Attach the UI changes:
![screen shot 2016-08-12 at 11 20 44 am](https://cloud.githubusercontent.com/assets/850797/17612032/6c2f4480-607f-11e6-82e8-a27fb8cbb4ae.png)
Backward compatibility is also considered for event-log and REST API. Old event log can still be replayed with off-heap usage displayed as 0. For REST API, only adds the new fields, so JSON backward compatibility can still be kept.
## How was this patch tested?
Unit test added and manual verification.
Author: jerryshao <sshao@hortonworks.com>
Closes #14617 from jerryshao/SPARK-17019.
Diffstat (limited to 'core/src/test/scala')
3 files changed, 119 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index dcf83cb530..764156c3ed 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -153,7 +153,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers "rdd list storage json" -> "applications/local-1422981780767/storage/rdd", "executor node blacklisting" -> "applications/app-20161116163331-0000/executors", - "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors" + "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors", + "executor memory usage" -> "applications/app-20161116163331-0000/executors" // Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845 // "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0" ) diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala index e5733aebf6..da198f946f 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -27,7 +27,7 @@ class StorageSuite extends SparkFunSuite { // For testing add, update, and remove (for non-RDD blocks) private def storageStatus1: StorageStatus = { - val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L) + val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L)) assert(status.blocks.isEmpty) assert(status.rddBlocks.isEmpty) assert(status.memUsed === 0L) @@ -74,7 +74,7 @@ class StorageSuite extends SparkFunSuite { // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks private def storageStatus2: StorageStatus = { - val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L) + val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L)) assert(status.rddBlocks.isEmpty) status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L)) status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L)) @@ -252,9 +252,9 @@ class StorageSuite extends SparkFunSuite { // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations private def stockStorageStatuses: Seq[StorageStatus] = { - val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L) - val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L) - val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L) + val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L)) + val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L, Some(2000L), Some(0L)) + val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L, Some(3000L), Some(0L)) status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L)) status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L)) status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L)) @@ -332,4 +332,81 @@ class StorageSuite extends SparkFunSuite { assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3")) } + private val offheap = StorageLevel.OFF_HEAP + // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD onheap + // and offheap blocks + private def storageStatus3: StorageStatus = { + val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, Some(1000L), Some(1000L)) + assert(status.rddBlocks.isEmpty) + status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L)) + status.addBlock(TestBlockId("man"), BlockStatus(offheap, 10L, 0L)) + status.addBlock(RDDBlockId(0, 0), BlockStatus(offheap, 10L, 0L)) + status.addBlock(RDDBlockId(1, 1), BlockStatus(offheap, 100L, 0L)) + status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L)) + status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L)) + status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L)) + status + } + + test("storage memUsed, diskUsed with on-heap and off-heap blocks") { + val status = storageStatus3 + def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum + def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum + + def actualOnHeapMemUsed: Long = + status.blocks.values.filter(!_.storageLevel.useOffHeap).map(_.memSize).sum + def actualOffHeapMemUsed: Long = + status.blocks.values.filter(_.storageLevel.useOffHeap).map(_.memSize).sum + + assert(status.maxMem === status.maxOnHeapMem.get + status.maxOffHeapMem.get) + + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + assert(status.onHeapMemUsed.get === actualOnHeapMemUsed) + assert(status.offHeapMemUsed.get === actualOffHeapMemUsed) + + assert(status.memRemaining === status.maxMem - actualMemUsed) + assert(status.onHeapMemRemaining.get === status.maxOnHeapMem.get - actualOnHeapMemUsed) + assert(status.offHeapMemRemaining.get === status.maxOffHeapMem.get - actualOffHeapMemUsed) + + status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L)) + status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L)) + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + + status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L)) + status.updateBlock(RDDBlockId(0, 0), BlockStatus(offheap, 4L, 0L)) + status.updateBlock(RDDBlockId(1, 1), BlockStatus(offheap, 4L, 0L)) + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + assert(status.onHeapMemUsed.get === actualOnHeapMemUsed) + assert(status.offHeapMemUsed.get === actualOffHeapMemUsed) + + status.removeBlock(TestBlockId("fire")) + status.removeBlock(TestBlockId("man")) + status.removeBlock(RDDBlockId(2, 2)) + status.removeBlock(RDDBlockId(2, 3)) + assert(status.memUsed === actualMemUsed) + assert(status.diskUsed === actualDiskUsed) + } + + private def storageStatus4: StorageStatus = { + val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, None, None) + status + } + test("old SparkListenerBlockManagerAdded event compatible") { + // This scenario will only be happened when replaying old event log. In this scenario there's + // no block add or remove event replayed, so only total amount of memory is valid. + val status = storageStatus4 + assert(status.maxMem === status.maxMemory) + + assert(status.memUsed === 0L) + assert(status.diskUsed === 0L) + assert(status.onHeapMemUsed === None) + assert(status.offHeapMemUsed === None) + + assert(status.memRemaining === status.maxMem) + assert(status.onHeapMemRemaining === None) + assert(status.offHeapMemRemaining === None) + } } diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 4228373036..f4c561c737 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.LocalSparkContext._ import org.apache.spark.api.java.StorageLevels import org.apache.spark.deploy.history.HistoryServerSuite import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus} +import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus} private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler { @@ -103,6 +103,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B .set("spark.ui.enabled", "true") .set("spark.ui.port", "0") .set("spark.ui.killEnabled", killEnabled.toString) + .set("spark.memory.offHeap.size", "64m") val sc = new SparkContext(conf) assert(sc.ui.isDefined) sc @@ -151,6 +152,39 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B val updatedRddJson = getJson(ui, "storage/rdd/0") (updatedRddJson \ "storageLevel").extract[String] should be ( StorageLevels.MEMORY_ONLY.description) + + val dataDistributions0 = + (updatedRddJson \ "dataDistribution").extract[Seq[RDDDataDistribution]] + dataDistributions0.length should be (1) + val dist0 = dataDistributions0.head + + dist0.onHeapMemoryUsed should not be (None) + dist0.memoryUsed should be (dist0.onHeapMemoryUsed.get) + dist0.onHeapMemoryRemaining should not be (None) + dist0.offHeapMemoryRemaining should not be (None) + dist0.memoryRemaining should be ( + dist0.onHeapMemoryRemaining.get + dist0.offHeapMemoryRemaining.get) + dist0.onHeapMemoryUsed should not be (Some(0L)) + dist0.offHeapMemoryUsed should be (Some(0L)) + + rdd.unpersist() + rdd.persist(StorageLevels.OFF_HEAP).count() + val updatedStorageJson1 = getJson(ui, "storage/rdd") + updatedStorageJson1.children.length should be (1) + val updatedRddJson1 = getJson(ui, "storage/rdd/0") + val dataDistributions1 = + (updatedRddJson1 \ "dataDistribution").extract[Seq[RDDDataDistribution]] + dataDistributions1.length should be (1) + val dist1 = dataDistributions1.head + + dist1.offHeapMemoryUsed should not be (None) + dist1.memoryUsed should be (dist1.offHeapMemoryUsed.get) + dist1.onHeapMemoryRemaining should not be (None) + dist1.offHeapMemoryRemaining should not be (None) + dist1.memoryRemaining should be ( + dist1.onHeapMemoryRemaining.get + dist1.offHeapMemoryRemaining.get) + dist1.onHeapMemoryUsed should be (Some(0L)) + dist1.offHeapMemoryUsed should not be (Some(0L)) } } |