diff options
author | jerryshao <sshao@hortonworks.com> | 2017-04-06 13:23:54 -0500 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2017-04-06 13:23:54 -0500 |
commit | a4491626ed8169f0162a0dfb78736c9b9e7fb434 (patch) | |
tree | c18ac0eeba2cb1e95ef6a6a2dee2a9c16ec31174 /core/src/main/scala/org/apache | |
parent | 5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9 (diff) | |
download | spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.tar.gz spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.tar.bz2 spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.zip |
[SPARK-17019][CORE] Expose on-heap and off-heap memory usage in various places
## What changes were proposed in this pull request?
With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992), Spark supports persisting data into off-heap memory, but the usage of on-heap and off-heap memory is not exposed currently, it is not so convenient for user to monitor and profile, so here propose to expose off-heap memory as well as on-heap memory usage in various places:
1. Spark UI's executor page will display both on-heap and off-heap memory usage.
2. REST request returns both on-heap and off-heap memory.
3. Also this can be gotten from MetricsSystem.
4. Last this usage can be obtained programmatically from SparkListener.
Attach the UI changes:
![screen shot 2016-08-12 at 11 20 44 am](https://cloud.githubusercontent.com/assets/850797/17612032/6c2f4480-607f-11e6-82e8-a27fb8cbb4ae.png)
Backward compatibility is also considered for event-log and REST API. Old event log can still be replayed with off-heap usage displayed as 0. For REST API, only adds the new fields, so JSON backward compatibility can still be kept.
## How was this patch tested?
Unit test added and manual verification.
Author: jerryshao <sshao@hortonworks.com>
Closes #14617 from jerryshao/SPARK-17019.
Diffstat (limited to 'core/src/main/scala/org/apache')
13 files changed, 214 insertions, 92 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 4331addb44..bc2e530716 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -87,8 +87,13 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S extends SparkListenerEvent @DeveloperApi -case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) - extends SparkListenerEvent +case class SparkListenerBlockManagerAdded( + time: Long, + blockManagerId: BlockManagerId, + maxMem: Long, + maxOnHeapMem: Option[Long] = None, + maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent { +} @DeveloperApi case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala index 5c03609e5e..1279b281ad 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala @@ -70,7 +70,13 @@ private[spark] object AllRDDResource { address = status.blockManagerId.hostPort, memoryUsed = status.memUsedByRdd(rddId), memoryRemaining = status.memRemaining, - diskUsed = status.diskUsedByRdd(rddId) + diskUsed = status.diskUsedByRdd(rddId), + onHeapMemoryUsed = Some( + if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L), + offHeapMemoryUsed = Some( + if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L), + onHeapMemoryRemaining = status.onHeapMemRemaining, + offHeapMemoryRemaining = status.offHeapMemRemaining ) } ) } else { None diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5b9227350e..d159b9450e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -75,7 +75,11 @@ class ExecutorSummary private[spark]( val totalShuffleWrite: Long, val isBlacklisted: Boolean, val maxMemory: Long, - val executorLogs: Map[String, String]) + val executorLogs: Map[String, String], + val onHeapMemoryUsed: Option[Long], + val offHeapMemoryUsed: Option[Long], + val maxOnHeapMemory: Option[Long], + val maxOffHeapMemory: Option[Long]) class JobData private[spark]( val jobId: Int, @@ -111,7 +115,11 @@ class RDDDataDistribution private[spark]( val address: String, val memoryUsed: Long, val memoryRemaining: Long, - val diskUsed: Long) + val diskUsed: Long, + val onHeapMemoryUsed: Option[Long], + val offHeapMemoryUsed: Option[Long], + val onHeapMemoryRemaining: Option[Long], + val offHeapMemoryRemaining: Option[Long]) class RDDPartitionInfo private[spark]( val blockName: String, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 46a078b2f9..63acba65d3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -150,8 +150,8 @@ private[spark] class BlockManager( // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxMemory` can ever possibly reach. We may need // to revisit whether reporting this value as the "max" is intuitive to the user. - private val maxMemory = - memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory + private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory + private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. @@ -229,7 +229,8 @@ private[spark] class BlockManager( val idFromMaster = master.registerBlockManager( id, - maxMemory, + maxOnHeapMemory, + maxOffHeapMemory, slaveEndpoint) blockManagerId = if (idFromMaster != null) idFromMaster else id @@ -307,7 +308,7 @@ private[spark] class BlockManager( def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") - master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) + master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) reportAllBlocks() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 3ca690db9e..ea5d8423a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -57,11 +57,12 @@ class BlockManagerMaster( */ def registerBlockManager( blockManagerId: BlockManagerId, - maxMemSize: Long, + maxOnHeapMemSize: Long, + maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $blockManagerId") val updatedId = driverEndpoint.askSync[BlockManagerId]( - RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint)) + RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 84c04d2260..467c3e0e6b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -71,8 +71,8 @@ class BlockManagerMasterEndpoint( logInfo("BlockManagerMasterEndpoint up") override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) => - context.reply(register(blockManagerId, maxMemSize, slaveEndpoint)) + case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => + context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) case _updateBlockInfo @ UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) => @@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint( private def storageStatus: Array[StorageStatus] = { blockManagerInfo.map { case (blockManagerId, info) => - new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala) + new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem), + Some(info.maxOffHeapMem), info.blocks.asScala) }.toArray } @@ -338,7 +339,8 @@ class BlockManagerMasterEndpoint( */ private def register( idWithoutTopologyInfo: BlockManagerId, - maxMemSize: Long, + maxOnHeapMemSize: Long, + maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { // the dummy id is not expected to contain the topology information. // we get that info here and respond back with a more fleshed out block manager id @@ -359,14 +361,15 @@ class BlockManagerMasterEndpoint( case None => } logInfo("Registering block manager %s with %s RAM, %s".format( - id.hostPort, Utils.bytesToString(maxMemSize), id)) + id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id)) blockManagerIdByExecutor(id.executorId) = id blockManagerInfo(id) = new BlockManagerInfo( - id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) + id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) } - listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) + listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize, + Some(maxOnHeapMemSize), Some(maxOffHeapMemSize))) id } @@ -464,10 +467,13 @@ object BlockStatus { private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, - val maxMem: Long, + val maxOnHeapMem: Long, + val maxOffHeapMem: Long, val slaveEndpoint: RpcEndpointRef) extends Logging { + val maxMem = maxOnHeapMem + maxOffHeapMem + private var _lastSeenMs: Long = timeMs private var _remainingMem: Long = maxMem diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 0aea438e7f..0c0ff14459 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -58,7 +58,8 @@ private[spark] object BlockManagerMessages { case class RegisterBlockManager( blockManagerId: BlockManagerId, - maxMemSize: Long, + maxOnHeapMemSize: Long, + maxOffHeapMemSize: Long, sender: RpcEndpointRef) extends ToBlockManagerMaster diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index c5ba9af3e2..197a01762c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -26,35 +26,39 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager) override val metricRegistry = new MetricRegistry() override val sourceName = "BlockManager" - metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).sum - maxMem / 1024 / 1024 - } - }) - - metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val remainingMem = storageStatusList.map(_.memRemaining).sum - remainingMem / 1024 / 1024 - } - }) - - metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val memUsed = storageStatusList.map(_.memUsed).sum - memUsed / 1024 / 1024 - } - }) - - metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum - diskSpaceUsed / 1024 / 1024 - } - }) + private def registerGauge(name: String, func: BlockManagerMaster => Long): Unit = { + metricRegistry.register(name, new Gauge[Long] { + override def getValue: Long = func(blockManager.master) / 1024 / 1024 + }) + } + + registerGauge(MetricRegistry.name("memory", "maxMem_MB"), + _.getStorageStatus.map(_.maxMem).sum) + + registerGauge(MetricRegistry.name("memory", "maxOnHeapMem_MB"), + _.getStorageStatus.map(_.maxOnHeapMem.getOrElse(0L)).sum) + + registerGauge(MetricRegistry.name("memory", "maxOffHeapMem_MB"), + _.getStorageStatus.map(_.maxOffHeapMem.getOrElse(0L)).sum) + + registerGauge(MetricRegistry.name("memory", "remainingMem_MB"), + _.getStorageStatus.map(_.memRemaining).sum) + + registerGauge(MetricRegistry.name("memory", "remainingOnHeapMem_MB"), + _.getStorageStatus.map(_.onHeapMemRemaining.getOrElse(0L)).sum) + + registerGauge(MetricRegistry.name("memory", "remainingOffHeapMem_MB"), + _.getStorageStatus.map(_.offHeapMemRemaining.getOrElse(0L)).sum) + + registerGauge(MetricRegistry.name("memory", "memUsed_MB"), + _.getStorageStatus.map(_.memUsed).sum) + + registerGauge(MetricRegistry.name("memory", "onHeapMemUsed_MB"), + _.getStorageStatus.map(_.onHeapMemUsed.getOrElse(0L)).sum) + + registerGauge(MetricRegistry.name("memory", "offHeapMemUsed_MB"), + _.getStorageStatus.map(_.offHeapMemUsed.getOrElse(0L)).sum) + + registerGauge(MetricRegistry.name("disk", "diskSpaceUsed_MB"), + _.getStorageStatus.map(_.diskUsed).sum) } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 798658a15b..1b30d4fa93 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -41,7 +41,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener { } def deadStorageStatusList: Seq[StorageStatus] = synchronized { - deadExecutorStorageStatus.toSeq + deadExecutorStorageStatus } /** Update storage status list to reflect updated block statuses */ @@ -74,8 +74,10 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener { synchronized { val blockManagerId = blockManagerAdded.blockManagerId val executorId = blockManagerId.executorId - val maxMem = blockManagerAdded.maxMem - val storageStatus = new StorageStatus(blockManagerId, maxMem) + // The onHeap and offHeap memory are always defined for new applications, + // but they can be missing if we are replaying old event logs. + val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem, + blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem) executorIdToStorageStatus(executorId) = storageStatus // Try to remove the dead storage status if same executor register the block manager twice. diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 241aacd74b..8f0d181fc8 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -35,7 +35,11 @@ import org.apache.spark.internal.Logging * class cannot mutate the source of the information. Accesses are not thread-safe. */ @DeveloperApi -class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { +class StorageStatus( + val blockManagerId: BlockManagerId, + val maxMemory: Long, + val maxOnHeapMem: Option[Long], + val maxOffHeapMem: Option[Long]) { /** * Internal representation of the blocks stored in this block manager. @@ -46,25 +50,21 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]] private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus] - /** - * Storage information of the blocks that entails memory, disk, and off-heap memory usage. - * - * As with the block maps, we store the storage information separately for RDD blocks and - * non-RDD blocks for the same reason. In particular, RDD storage information is stored - * in a map indexed by the RDD ID to the following 4-tuple: - * - * (memory size, disk size, storage level) - * - * We assume that all the blocks that belong to the same RDD have the same storage level. - * This field is not relevant to non-RDD blocks, however, so the storage information for - * non-RDD blocks contains only the first 3 fields (in the same order). - */ - private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)] - private var _nonRddStorageInfo: (Long, Long) = (0L, 0L) + private case class RddStorageInfo(memoryUsage: Long, diskUsage: Long, level: StorageLevel) + private val _rddStorageInfo = new mutable.HashMap[Int, RddStorageInfo] + + private case class NonRddStorageInfo(var onHeapUsage: Long, var offHeapUsage: Long, + var diskUsage: Long) + private val _nonRddStorageInfo = NonRddStorageInfo(0L, 0L, 0L) /** Create a storage status with an initial set of blocks, leaving the source unmodified. */ - def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) { - this(bmid, maxMem) + def this( + bmid: BlockManagerId, + maxMemory: Long, + maxOnHeapMem: Option[Long], + maxOffHeapMem: Option[Long], + initialBlocks: Map[BlockId, BlockStatus]) { + this(bmid, maxMemory, maxOnHeapMem, maxOffHeapMem) initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) } } @@ -176,26 +176,57 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { */ def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0) + /** Return the max memory can be used by this block manager. */ + def maxMem: Long = maxMemory + /** Return the memory remaining in this block manager. */ def memRemaining: Long = maxMem - memUsed + /** Return the memory used by caching RDDs */ + def cacheSize: Long = onHeapCacheSize.getOrElse(0L) + offHeapCacheSize.getOrElse(0L) + /** Return the memory used by this block manager. */ - def memUsed: Long = _nonRddStorageInfo._1 + cacheSize + def memUsed: Long = onHeapMemUsed.getOrElse(0L) + offHeapMemUsed.getOrElse(0L) - /** Return the memory used by caching RDDs */ - def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum + /** Return the on-heap memory remaining in this block manager. */ + def onHeapMemRemaining: Option[Long] = + for (m <- maxOnHeapMem; o <- onHeapMemUsed) yield m - o + + /** Return the off-heap memory remaining in this block manager. */ + def offHeapMemRemaining: Option[Long] = + for (m <- maxOffHeapMem; o <- offHeapMemUsed) yield m - o + + /** Return the on-heap memory used by this block manager. */ + def onHeapMemUsed: Option[Long] = onHeapCacheSize.map(_ + _nonRddStorageInfo.onHeapUsage) + + /** Return the off-heap memory used by this block manager. */ + def offHeapMemUsed: Option[Long] = offHeapCacheSize.map(_ + _nonRddStorageInfo.offHeapUsage) + + /** Return the memory used by on-heap caching RDDs */ + def onHeapCacheSize: Option[Long] = maxOnHeapMem.map { _ => + _rddStorageInfo.collect { + case (_, storageInfo) if !storageInfo.level.useOffHeap => storageInfo.memoryUsage + }.sum + } + + /** Return the memory used by off-heap caching RDDs */ + def offHeapCacheSize: Option[Long] = maxOffHeapMem.map { _ => + _rddStorageInfo.collect { + case (_, storageInfo) if storageInfo.level.useOffHeap => storageInfo.memoryUsage + }.sum + } /** Return the disk space used by this block manager. */ - def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum + def diskUsed: Long = _nonRddStorageInfo.diskUsage + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum /** Return the memory used by the given RDD in this block manager in O(1) time. */ - def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L) + def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.memoryUsage).getOrElse(0L) /** Return the disk space used by the given RDD in this block manager in O(1) time. */ - def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L) + def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.diskUsage).getOrElse(0L) /** Return the storage level, if any, used by the given RDD in this block manager. */ - def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3) + def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_.level) /** * Update the relevant storage info, taking into account any existing status for this block. @@ -210,10 +241,12 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { val (oldMem, oldDisk) = blockId match { case RDDBlockId(rddId, _) => _rddStorageInfo.get(rddId) - .map { case (mem, disk, _) => (mem, disk) } + .map { case RddStorageInfo(mem, disk, _) => (mem, disk) } .getOrElse((0L, 0L)) - case _ => - _nonRddStorageInfo + case _ if !level.useOffHeap => + (_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage) + case _ if level.useOffHeap => + (_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage) } val newMem = math.max(oldMem + changeInMem, 0L) val newDisk = math.max(oldDisk + changeInDisk, 0L) @@ -225,13 +258,17 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { if (newMem + newDisk == 0) { _rddStorageInfo.remove(rddId) } else { - _rddStorageInfo(rddId) = (newMem, newDisk, level) + _rddStorageInfo(rddId) = RddStorageInfo(newMem, newDisk, level) } case _ => - _nonRddStorageInfo = (newMem, newDisk) + if (!level.useOffHeap) { + _nonRddStorageInfo.onHeapUsage = newMem + } else { + _nonRddStorageInfo.offHeapUsage = newMem + } + _nonRddStorageInfo.diskUsage = newDisk } } - } /** Helper methods for storage-related objects. */ diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index d849ce76a9..0a3c63d14c 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -40,7 +40,8 @@ private[ui] case class ExecutorSummaryInfo( totalShuffleRead: Long, totalShuffleWrite: Long, isBlacklisted: Int, - maxMemory: Long, + maxOnHeapMem: Long, + maxOffHeapMem: Long, executorLogs: Map[String, String]) @@ -53,6 +54,34 @@ private[ui] class ExecutorsPage( val content = <div> { + <div> + <span class="expand-additional-metrics"> + <span class="expand-additional-metrics-arrow arrow-closed"></span> + <a>Show Additional Metrics</a> + </span> + <div class="additional-metrics collapsed"> + <ul> + <li> + <input type="checkbox" id="select-all-metrics"/> + <span class="additional-metric-title"><em>(De)select All</em></span> + </li> + <li> + <span data-toggle="tooltip" + title={ExecutorsPage.ON_HEAP_MEMORY_TOOLTIP} data-placement="right"> + <input type="checkbox" name="on_heap_memory"/> + <span class="additional-metric-title">On Heap Storage Memory</span> + </span> + </li> + <li> + <span data-toggle="tooltip" + title={ExecutorsPage.OFF_HEAP_MEMORY_TOOLTIP} data-placement="right"> + <input type="checkbox" name="off_heap_memory"/> + <span class="additional-metric-title">Off Heap Storage Memory</span> + </span> + </li> + </ul> + </div> + </div> ++ <div id="active-executors"></div> ++ <script src={UIUtils.prependBaseUri("/static/utils.js")}></script> ++ <script src={UIUtils.prependBaseUri("/static/executorspage.js")}></script> ++ @@ -65,6 +94,11 @@ private[ui] class ExecutorsPage( } private[spark] object ExecutorsPage { + private val ON_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for on heap " + + "storage of data like RDD partitions cached in memory." + private val OFF_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for off heap " + + "storage of data like RDD partitions cached in memory." + /** Represent an executor's info as a map given a storage status index */ def getExecInfo( listener: ExecutorsListener, @@ -80,6 +114,10 @@ private[spark] object ExecutorsPage { val rddBlocks = status.numBlocks val memUsed = status.memUsed val maxMem = status.maxMem + val onHeapMemUsed = status.onHeapMemUsed + val offHeapMemUsed = status.offHeapMemUsed + val maxOnHeapMem = status.maxOnHeapMem + val maxOffHeapMem = status.maxOffHeapMem val diskUsed = status.diskUsed val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId)) @@ -103,7 +141,11 @@ private[spark] object ExecutorsPage { taskSummary.shuffleWrite, taskSummary.isBlacklisted, maxMem, - taskSummary.executorLogs + taskSummary.executorLogs, + onHeapMemUsed, + offHeapMemUsed, + maxOnHeapMem, + maxOffHeapMem ) } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 227e940c9c..a1a0c729b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -147,7 +147,8 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { /** Header fields for the worker table */ private def workerHeader = Seq( "Host", - "Memory Usage", + "On Heap Memory Usage", + "Off Heap Memory Usage", "Disk Usage") /** Render an HTML row representing a worker */ @@ -155,8 +156,12 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { <tr> <td>{worker.address}</td> <td> - {Utils.bytesToString(worker.memoryUsed)} - ({Utils.bytesToString(worker.memoryRemaining)} Remaining) + {Utils.bytesToString(worker.onHeapMemoryUsed.getOrElse(0L))} + ({Utils.bytesToString(worker.onHeapMemoryRemaining.getOrElse(0L))} Remaining) + </td> + <td> + {Utils.bytesToString(worker.offHeapMemoryUsed.getOrElse(0L))} + ({Utils.bytesToString(worker.offHeapMemoryRemaining.getOrElse(0L))} Remaining) </td> <td>{Utils.bytesToString(worker.diskUsed)}</td> </tr> diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 1d2cb7acef..8296c42942 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -182,7 +182,9 @@ private[spark] object JsonProtocol { ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~ ("Block Manager ID" -> blockManagerId) ~ ("Maximum Memory" -> blockManagerAdded.maxMem) ~ - ("Timestamp" -> blockManagerAdded.time) + ("Timestamp" -> blockManagerAdded.time) ~ + ("Maximum Onheap Memory" -> blockManagerAdded.maxOnHeapMem) ~ + ("Maximum Offheap Memory" -> blockManagerAdded.maxOffHeapMem) } def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = { @@ -612,7 +614,9 @@ private[spark] object JsonProtocol { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") val maxMem = (json \ "Maximum Memory").extract[Long] val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L) - SparkListenerBlockManagerAdded(time, blockManagerId, maxMem) + val maxOnHeapMem = Utils.jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long]) + val maxOffHeapMem = Utils.jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long]) + SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem) } def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { |