aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html18
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/executorspage.js103
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/webui.css3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala66
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala99
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala8
16 files changed, 334 insertions, 96 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 4e83d6d564..5c91304e49 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -24,7 +24,15 @@ limitations under the License.
<th></th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip"
- title="Memory used / total available memory for storage of data like RDD partitions cached in memory. ">Storage Memory</span>
+ title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">Storage Memory</span>
+ </th>
+ <th class="on_heap_memory">
+ <span data-toggle="tooltip"
+ title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">On Heap Storage Memory</span>
+ </th>
+ <th class="off_heap_memory">
+ <span data-toggle="tooltip"
+ title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">Off Heap Storage Memory</span>
</th>
<th>Disk Used</th>
<th>Cores</th>
@@ -73,6 +81,14 @@ limitations under the License.
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
Storage Memory</span></th>
+ <th class="on_heap_memory">
+ <span data-toggle="tooltip" data-placement="top"
+ title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">
+ On Heap Storage Memory</span></th>
+ <th class="off_heap_memory">
+ <span data-toggle="tooltip"
+ title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
+ Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index 7dbfe32de9..930a069892 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -190,6 +190,10 @@ $(document).ready(function () {
var allRDDBlocks = 0;
var allMemoryUsed = 0;
var allMaxMemory = 0;
+ var allOnHeapMemoryUsed = 0;
+ var allOnHeapMaxMemory = 0;
+ var allOffHeapMemoryUsed = 0;
+ var allOffHeapMaxMemory = 0;
var allDiskUsed = 0;
var allTotalCores = 0;
var allMaxTasks = 0;
@@ -208,6 +212,10 @@ $(document).ready(function () {
var activeRDDBlocks = 0;
var activeMemoryUsed = 0;
var activeMaxMemory = 0;
+ var activeOnHeapMemoryUsed = 0;
+ var activeOnHeapMaxMemory = 0;
+ var activeOffHeapMemoryUsed = 0;
+ var activeOffHeapMaxMemory = 0;
var activeDiskUsed = 0;
var activeTotalCores = 0;
var activeMaxTasks = 0;
@@ -226,6 +234,10 @@ $(document).ready(function () {
var deadRDDBlocks = 0;
var deadMemoryUsed = 0;
var deadMaxMemory = 0;
+ var deadOnHeapMemoryUsed = 0;
+ var deadOnHeapMaxMemory = 0;
+ var deadOffHeapMemoryUsed = 0;
+ var deadOffHeapMaxMemory = 0;
var deadDiskUsed = 0;
var deadTotalCores = 0;
var deadMaxTasks = 0;
@@ -241,10 +253,21 @@ $(document).ready(function () {
var deadTotalBlacklisted = 0;
response.forEach(function (exec) {
+ exec.onHeapMemoryUsed = exec.hasOwnProperty('onHeapMemoryUsed') ? exec.onHeapMemoryUsed : 0;
+ exec.maxOnHeapMemory = exec.hasOwnProperty('maxOnHeapMemory') ? exec.maxOnHeapMemory : 0;
+ exec.offHeapMemoryUsed = exec.hasOwnProperty('offHeapMemoryUsed') ? exec.offHeapMemoryUsed : 0;
+ exec.maxOffHeapMemory = exec.hasOwnProperty('maxOffHeapMemory') ? exec.maxOffHeapMemory : 0;
+ });
+
+ response.forEach(function (exec) {
allExecCnt += 1;
allRDDBlocks += exec.rddBlocks;
allMemoryUsed += exec.memoryUsed;
allMaxMemory += exec.maxMemory;
+ allOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+ allOnHeapMaxMemory += exec.maxOnHeapMemory;
+ allOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+ allOffHeapMaxMemory += exec.maxOffHeapMemory;
allDiskUsed += exec.diskUsed;
allTotalCores += exec.totalCores;
allMaxTasks += exec.maxTasks;
@@ -263,6 +286,10 @@ $(document).ready(function () {
activeRDDBlocks += exec.rddBlocks;
activeMemoryUsed += exec.memoryUsed;
activeMaxMemory += exec.maxMemory;
+ activeOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+ activeOnHeapMaxMemory += exec.maxOnHeapMemory;
+ activeOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+ activeOffHeapMaxMemory += exec.maxOffHeapMemory;
activeDiskUsed += exec.diskUsed;
activeTotalCores += exec.totalCores;
activeMaxTasks += exec.maxTasks;
@@ -281,6 +308,10 @@ $(document).ready(function () {
deadRDDBlocks += exec.rddBlocks;
deadMemoryUsed += exec.memoryUsed;
deadMaxMemory += exec.maxMemory;
+ deadOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+ deadOnHeapMaxMemory += exec.maxOnHeapMemory;
+ deadOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+ deadOffHeapMaxMemory += exec.maxOffHeapMemory;
deadDiskUsed += exec.diskUsed;
deadTotalCores += exec.totalCores;
deadMaxTasks += exec.maxTasks;
@@ -302,6 +333,10 @@ $(document).ready(function () {
"allRDDBlocks": allRDDBlocks,
"allMemoryUsed": allMemoryUsed,
"allMaxMemory": allMaxMemory,
+ "allOnHeapMemoryUsed": allOnHeapMemoryUsed,
+ "allOnHeapMaxMemory": allOnHeapMaxMemory,
+ "allOffHeapMemoryUsed": allOffHeapMemoryUsed,
+ "allOffHeapMaxMemory": allOffHeapMaxMemory,
"allDiskUsed": allDiskUsed,
"allTotalCores": allTotalCores,
"allMaxTasks": allMaxTasks,
@@ -321,6 +356,10 @@ $(document).ready(function () {
"allRDDBlocks": activeRDDBlocks,
"allMemoryUsed": activeMemoryUsed,
"allMaxMemory": activeMaxMemory,
+ "allOnHeapMemoryUsed": activeOnHeapMemoryUsed,
+ "allOnHeapMaxMemory": activeOnHeapMaxMemory,
+ "allOffHeapMemoryUsed": activeOffHeapMemoryUsed,
+ "allOffHeapMaxMemory": activeOffHeapMaxMemory,
"allDiskUsed": activeDiskUsed,
"allTotalCores": activeTotalCores,
"allMaxTasks": activeMaxTasks,
@@ -340,6 +379,10 @@ $(document).ready(function () {
"allRDDBlocks": deadRDDBlocks,
"allMemoryUsed": deadMemoryUsed,
"allMaxMemory": deadMaxMemory,
+ "allOnHeapMemoryUsed": deadOnHeapMemoryUsed,
+ "allOnHeapMaxMemory": deadOnHeapMaxMemory,
+ "allOffHeapMemoryUsed": deadOffHeapMemoryUsed,
+ "allOffHeapMaxMemory": deadOffHeapMaxMemory,
"allDiskUsed": deadDiskUsed,
"allTotalCores": deadTotalCores,
"allMaxTasks": deadMaxTasks,
@@ -378,7 +421,35 @@ $(document).ready(function () {
{data: 'rddBlocks'},
{
data: function (row, type) {
- return type === 'display' ? (formatBytes(row.memoryUsed, type) + ' / ' + formatBytes(row.maxMemory, type)) : row.memoryUsed;
+ if (type !== 'display')
+ return row.memoryUsed;
+ else
+ return (formatBytes(row.memoryUsed, type) + ' / ' +
+ formatBytes(row.maxMemory, type));
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.onHeapMemoryUsed;
+ else
+ return (formatBytes(row.onHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.maxOnHeapMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('on_heap_memory')
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.offHeapMemoryUsed;
+ else
+ return (formatBytes(row.offHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.maxOffHeapMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('off_heap_memory')
}
},
{data: 'diskUsed', render: formatBytes},
@@ -450,7 +521,35 @@ $(document).ready(function () {
{data: 'allRDDBlocks'},
{
data: function (row, type) {
- return type === 'display' ? (formatBytes(row.allMemoryUsed, type) + ' / ' + formatBytes(row.allMaxMemory, type)) : row.allMemoryUsed;
+ if (type !== 'display')
+ return row.allMemoryUsed
+ else
+ return (formatBytes(row.allMemoryUsed, type) + ' / ' +
+ formatBytes(row.allMaxMemory, type));
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.allOnHeapMemoryUsed;
+ else
+ return (formatBytes(row.allOnHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.allOnHeapMaxMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('on_heap_memory')
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.allOffHeapMemoryUsed;
+ else
+ return (formatBytes(row.allOffHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.allOffHeapMaxMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('off_heap_memory')
}
},
{data: 'allDiskUsed', render: formatBytes},
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 319a719efa..935d9b1aec 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -205,7 +205,8 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
-.serialization_time, .getting_result_time, .peak_execution_memory {
+.serialization_time, .getting_result_time, .peak_execution_memory,
+.on_heap_memory, .off_heap_memory {
display: none;
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 4331addb44..bc2e530716 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -87,8 +87,13 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
- extends SparkListenerEvent
+case class SparkListenerBlockManagerAdded(
+ time: Long,
+ blockManagerId: BlockManagerId,
+ maxMem: Long,
+ maxOnHeapMem: Option[Long] = None,
+ maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent {
+}
@DeveloperApi
case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
index 5c03609e5e..1279b281ad 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -70,7 +70,13 @@ private[spark] object AllRDDResource {
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
- diskUsed = status.diskUsedByRdd(rddId)
+ diskUsed = status.diskUsedByRdd(rddId),
+ onHeapMemoryUsed = Some(
+ if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+ offHeapMemoryUsed = Some(
+ if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+ onHeapMemoryRemaining = status.onHeapMemRemaining,
+ offHeapMemoryRemaining = status.offHeapMemRemaining
) } )
} else {
None
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5b9227350e..d159b9450e 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -75,7 +75,11 @@ class ExecutorSummary private[spark](
val totalShuffleWrite: Long,
val isBlacklisted: Boolean,
val maxMemory: Long,
- val executorLogs: Map[String, String])
+ val executorLogs: Map[String, String],
+ val onHeapMemoryUsed: Option[Long],
+ val offHeapMemoryUsed: Option[Long],
+ val maxOnHeapMemory: Option[Long],
+ val maxOffHeapMemory: Option[Long])
class JobData private[spark](
val jobId: Int,
@@ -111,7 +115,11 @@ class RDDDataDistribution private[spark](
val address: String,
val memoryUsed: Long,
val memoryRemaining: Long,
- val diskUsed: Long)
+ val diskUsed: Long,
+ val onHeapMemoryUsed: Option[Long],
+ val offHeapMemoryUsed: Option[Long],
+ val onHeapMemoryRemaining: Option[Long],
+ val offHeapMemoryRemaining: Option[Long])
class RDDPartitionInfo private[spark](
val blockName: String,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 46a078b2f9..63acba65d3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -150,8 +150,8 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
- private val maxMemory =
- memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
+ private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
+ private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
@@ -229,7 +229,8 @@ private[spark] class BlockManager(
val idFromMaster = master.registerBlockManager(
id,
- maxMemory,
+ maxOnHeapMemory,
+ maxOffHeapMemory,
slaveEndpoint)
blockManagerId = if (idFromMaster != null) idFromMaster else id
@@ -307,7 +308,7 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
- master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
+ master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
reportAllBlocks()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 3ca690db9e..ea5d8423a5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -57,11 +57,12 @@ class BlockManagerMaster(
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
- maxMemSize: Long,
+ maxOnHeapMemSize: Long,
+ maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
- RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
+ RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 84c04d2260..467c3e0e6b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -71,8 +71,8 @@ class BlockManagerMasterEndpoint(
logInfo("BlockManagerMasterEndpoint up")
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
- context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
+ case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
+ context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint(
private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
- new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
+ new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
+ Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}
@@ -338,7 +339,8 @@ class BlockManagerMasterEndpoint(
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
- maxMemSize: Long,
+ maxOnHeapMemSize: Long,
+ maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
@@ -359,14 +361,15 @@ class BlockManagerMasterEndpoint(
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
- id.hostPort, Utils.bytesToString(maxMemSize), id))
+ id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
blockManagerIdByExecutor(id.executorId) = id
blockManagerInfo(id) = new BlockManagerInfo(
- id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
+ id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
}
- listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
+ listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
+ Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}
@@ -464,10 +467,13 @@ object BlockStatus {
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
- val maxMem: Long,
+ val maxOnHeapMem: Long,
+ val maxOffHeapMem: Long,
val slaveEndpoint: RpcEndpointRef)
extends Logging {
+ val maxMem = maxOnHeapMem + maxOffHeapMem
+
private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 0aea438e7f..0c0ff14459 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -58,7 +58,8 @@ private[spark] object BlockManagerMessages {
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
- maxMemSize: Long,
+ maxOnHeapMemSize: Long,
+ maxOffHeapMemSize: Long,
sender: RpcEndpointRef)
extends ToBlockManagerMaster
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index c5ba9af3e2..197a01762c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -26,35 +26,39 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager)
override val metricRegistry = new MetricRegistry()
override val sourceName = "BlockManager"
- metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).sum
- maxMem / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).sum
- remainingMem / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val memUsed = storageStatusList.map(_.memUsed).sum
- memUsed / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
- diskSpaceUsed / 1024 / 1024
- }
- })
+ private def registerGauge(name: String, func: BlockManagerMaster => Long): Unit = {
+ metricRegistry.register(name, new Gauge[Long] {
+ override def getValue: Long = func(blockManager.master) / 1024 / 1024
+ })
+ }
+
+ registerGauge(MetricRegistry.name("memory", "maxMem_MB"),
+ _.getStorageStatus.map(_.maxMem).sum)
+
+ registerGauge(MetricRegistry.name("memory", "maxOnHeapMem_MB"),
+ _.getStorageStatus.map(_.maxOnHeapMem.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "maxOffHeapMem_MB"),
+ _.getStorageStatus.map(_.maxOffHeapMem.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "remainingMem_MB"),
+ _.getStorageStatus.map(_.memRemaining).sum)
+
+ registerGauge(MetricRegistry.name("memory", "remainingOnHeapMem_MB"),
+ _.getStorageStatus.map(_.onHeapMemRemaining.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "remainingOffHeapMem_MB"),
+ _.getStorageStatus.map(_.offHeapMemRemaining.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "memUsed_MB"),
+ _.getStorageStatus.map(_.memUsed).sum)
+
+ registerGauge(MetricRegistry.name("memory", "onHeapMemUsed_MB"),
+ _.getStorageStatus.map(_.onHeapMemUsed.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "offHeapMemUsed_MB"),
+ _.getStorageStatus.map(_.offHeapMemUsed.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("disk", "diskSpaceUsed_MB"),
+ _.getStorageStatus.map(_.diskUsed).sum)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 798658a15b..1b30d4fa93 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -41,7 +41,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
def deadStorageStatusList: Seq[StorageStatus] = synchronized {
- deadExecutorStorageStatus.toSeq
+ deadExecutorStorageStatus
}
/** Update storage status list to reflect updated block statuses */
@@ -74,8 +74,10 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
synchronized {
val blockManagerId = blockManagerAdded.blockManagerId
val executorId = blockManagerId.executorId
- val maxMem = blockManagerAdded.maxMem
- val storageStatus = new StorageStatus(blockManagerId, maxMem)
+ // The onHeap and offHeap memory are always defined for new applications,
+ // but they can be missing if we are replaying old event logs.
+ val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem,
+ blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem)
executorIdToStorageStatus(executorId) = storageStatus
// Try to remove the dead storage status if same executor register the block manager twice.
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 241aacd74b..8f0d181fc8 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -35,7 +35,11 @@ import org.apache.spark.internal.Logging
* class cannot mutate the source of the information. Accesses are not thread-safe.
*/
@DeveloperApi
-class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
+class StorageStatus(
+ val blockManagerId: BlockManagerId,
+ val maxMemory: Long,
+ val maxOnHeapMem: Option[Long],
+ val maxOffHeapMem: Option[Long]) {
/**
* Internal representation of the blocks stored in this block manager.
@@ -46,25 +50,21 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
- /**
- * Storage information of the blocks that entails memory, disk, and off-heap memory usage.
- *
- * As with the block maps, we store the storage information separately for RDD blocks and
- * non-RDD blocks for the same reason. In particular, RDD storage information is stored
- * in a map indexed by the RDD ID to the following 4-tuple:
- *
- * (memory size, disk size, storage level)
- *
- * We assume that all the blocks that belong to the same RDD have the same storage level.
- * This field is not relevant to non-RDD blocks, however, so the storage information for
- * non-RDD blocks contains only the first 3 fields (in the same order).
- */
- private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
- private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
+ private case class RddStorageInfo(memoryUsage: Long, diskUsage: Long, level: StorageLevel)
+ private val _rddStorageInfo = new mutable.HashMap[Int, RddStorageInfo]
+
+ private case class NonRddStorageInfo(var onHeapUsage: Long, var offHeapUsage: Long,
+ var diskUsage: Long)
+ private val _nonRddStorageInfo = NonRddStorageInfo(0L, 0L, 0L)
/** Create a storage status with an initial set of blocks, leaving the source unmodified. */
- def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
- this(bmid, maxMem)
+ def this(
+ bmid: BlockManagerId,
+ maxMemory: Long,
+ maxOnHeapMem: Option[Long],
+ maxOffHeapMem: Option[Long],
+ initialBlocks: Map[BlockId, BlockStatus]) {
+ this(bmid, maxMemory, maxOnHeapMem, maxOffHeapMem)
initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
}
@@ -176,26 +176,57 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
*/
def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
+ /** Return the max memory can be used by this block manager. */
+ def maxMem: Long = maxMemory
+
/** Return the memory remaining in this block manager. */
def memRemaining: Long = maxMem - memUsed
+ /** Return the memory used by caching RDDs */
+ def cacheSize: Long = onHeapCacheSize.getOrElse(0L) + offHeapCacheSize.getOrElse(0L)
+
/** Return the memory used by this block manager. */
- def memUsed: Long = _nonRddStorageInfo._1 + cacheSize
+ def memUsed: Long = onHeapMemUsed.getOrElse(0L) + offHeapMemUsed.getOrElse(0L)
- /** Return the memory used by caching RDDs */
- def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+ /** Return the on-heap memory remaining in this block manager. */
+ def onHeapMemRemaining: Option[Long] =
+ for (m <- maxOnHeapMem; o <- onHeapMemUsed) yield m - o
+
+ /** Return the off-heap memory remaining in this block manager. */
+ def offHeapMemRemaining: Option[Long] =
+ for (m <- maxOffHeapMem; o <- offHeapMemUsed) yield m - o
+
+ /** Return the on-heap memory used by this block manager. */
+ def onHeapMemUsed: Option[Long] = onHeapCacheSize.map(_ + _nonRddStorageInfo.onHeapUsage)
+
+ /** Return the off-heap memory used by this block manager. */
+ def offHeapMemUsed: Option[Long] = offHeapCacheSize.map(_ + _nonRddStorageInfo.offHeapUsage)
+
+ /** Return the memory used by on-heap caching RDDs */
+ def onHeapCacheSize: Option[Long] = maxOnHeapMem.map { _ =>
+ _rddStorageInfo.collect {
+ case (_, storageInfo) if !storageInfo.level.useOffHeap => storageInfo.memoryUsage
+ }.sum
+ }
+
+ /** Return the memory used by off-heap caching RDDs */
+ def offHeapCacheSize: Option[Long] = maxOffHeapMem.map { _ =>
+ _rddStorageInfo.collect {
+ case (_, storageInfo) if storageInfo.level.useOffHeap => storageInfo.memoryUsage
+ }.sum
+ }
/** Return the disk space used by this block manager. */
- def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
+ def diskUsed: Long = _nonRddStorageInfo.diskUsage + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
/** Return the memory used by the given RDD in this block manager in O(1) time. */
- def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
+ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.memoryUsage).getOrElse(0L)
/** Return the disk space used by the given RDD in this block manager in O(1) time. */
- def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
+ def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.diskUsage).getOrElse(0L)
/** Return the storage level, if any, used by the given RDD in this block manager. */
- def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
+ def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_.level)
/**
* Update the relevant storage info, taking into account any existing status for this block.
@@ -210,10 +241,12 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
val (oldMem, oldDisk) = blockId match {
case RDDBlockId(rddId, _) =>
_rddStorageInfo.get(rddId)
- .map { case (mem, disk, _) => (mem, disk) }
+ .map { case RddStorageInfo(mem, disk, _) => (mem, disk) }
.getOrElse((0L, 0L))
- case _ =>
- _nonRddStorageInfo
+ case _ if !level.useOffHeap =>
+ (_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
+ case _ if level.useOffHeap =>
+ (_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
}
val newMem = math.max(oldMem + changeInMem, 0L)
val newDisk = math.max(oldDisk + changeInDisk, 0L)
@@ -225,13 +258,17 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
if (newMem + newDisk == 0) {
_rddStorageInfo.remove(rddId)
} else {
- _rddStorageInfo(rddId) = (newMem, newDisk, level)
+ _rddStorageInfo(rddId) = RddStorageInfo(newMem, newDisk, level)
}
case _ =>
- _nonRddStorageInfo = (newMem, newDisk)
+ if (!level.useOffHeap) {
+ _nonRddStorageInfo.onHeapUsage = newMem
+ } else {
+ _nonRddStorageInfo.offHeapUsage = newMem
+ }
+ _nonRddStorageInfo.diskUsage = newDisk
}
}
-
}
/** Helper methods for storage-related objects. */
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index d849ce76a9..0a3c63d14c 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -40,7 +40,8 @@ private[ui] case class ExecutorSummaryInfo(
totalShuffleRead: Long,
totalShuffleWrite: Long,
isBlacklisted: Int,
- maxMemory: Long,
+ maxOnHeapMem: Long,
+ maxOffHeapMem: Long,
executorLogs: Map[String, String])
@@ -53,6 +54,34 @@ private[ui] class ExecutorsPage(
val content =
<div>
{
+ <div>
+ <span class="expand-additional-metrics">
+ <span class="expand-additional-metrics-arrow arrow-closed"></span>
+ <a>Show Additional Metrics</a>
+ </span>
+ <div class="additional-metrics collapsed">
+ <ul>
+ <li>
+ <input type="checkbox" id="select-all-metrics"/>
+ <span class="additional-metric-title"><em>(De)select All</em></span>
+ </li>
+ <li>
+ <span data-toggle="tooltip"
+ title={ExecutorsPage.ON_HEAP_MEMORY_TOOLTIP} data-placement="right">
+ <input type="checkbox" name="on_heap_memory"/>
+ <span class="additional-metric-title">On Heap Storage Memory</span>
+ </span>
+ </li>
+ <li>
+ <span data-toggle="tooltip"
+ title={ExecutorsPage.OFF_HEAP_MEMORY_TOOLTIP} data-placement="right">
+ <input type="checkbox" name="off_heap_memory"/>
+ <span class="additional-metric-title">Off Heap Storage Memory</span>
+ </span>
+ </li>
+ </ul>
+ </div>
+ </div> ++
<div id="active-executors"></div> ++
<script src={UIUtils.prependBaseUri("/static/utils.js")}></script> ++
<script src={UIUtils.prependBaseUri("/static/executorspage.js")}></script> ++
@@ -65,6 +94,11 @@ private[ui] class ExecutorsPage(
}
private[spark] object ExecutorsPage {
+ private val ON_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for on heap " +
+ "storage of data like RDD partitions cached in memory."
+ private val OFF_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for off heap " +
+ "storage of data like RDD partitions cached in memory."
+
/** Represent an executor's info as a map given a storage status index */
def getExecInfo(
listener: ExecutorsListener,
@@ -80,6 +114,10 @@ private[spark] object ExecutorsPage {
val rddBlocks = status.numBlocks
val memUsed = status.memUsed
val maxMem = status.maxMem
+ val onHeapMemUsed = status.onHeapMemUsed
+ val offHeapMemUsed = status.offHeapMemUsed
+ val maxOnHeapMem = status.maxOnHeapMem
+ val maxOffHeapMem = status.maxOffHeapMem
val diskUsed = status.diskUsed
val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId))
@@ -103,7 +141,11 @@ private[spark] object ExecutorsPage {
taskSummary.shuffleWrite,
taskSummary.isBlacklisted,
maxMem,
- taskSummary.executorLogs
+ taskSummary.executorLogs,
+ onHeapMemUsed,
+ offHeapMemUsed,
+ maxOnHeapMem,
+ maxOffHeapMem
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 227e940c9c..a1a0c729b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -147,7 +147,8 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
/** Header fields for the worker table */
private def workerHeader = Seq(
"Host",
- "Memory Usage",
+ "On Heap Memory Usage",
+ "Off Heap Memory Usage",
"Disk Usage")
/** Render an HTML row representing a worker */
@@ -155,8 +156,12 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
<tr>
<td>{worker.address}</td>
<td>
- {Utils.bytesToString(worker.memoryUsed)}
- ({Utils.bytesToString(worker.memoryRemaining)} Remaining)
+ {Utils.bytesToString(worker.onHeapMemoryUsed.getOrElse(0L))}
+ ({Utils.bytesToString(worker.onHeapMemoryRemaining.getOrElse(0L))} Remaining)
+ </td>
+ <td>
+ {Utils.bytesToString(worker.offHeapMemoryUsed.getOrElse(0L))}
+ ({Utils.bytesToString(worker.offHeapMemoryRemaining.getOrElse(0L))} Remaining)
</td>
<td>{Utils.bytesToString(worker.diskUsed)}</td>
</tr>
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1d2cb7acef..8296c42942 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -182,7 +182,9 @@ private[spark] object JsonProtocol {
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
("Block Manager ID" -> blockManagerId) ~
("Maximum Memory" -> blockManagerAdded.maxMem) ~
- ("Timestamp" -> blockManagerAdded.time)
+ ("Timestamp" -> blockManagerAdded.time) ~
+ ("Maximum Onheap Memory" -> blockManagerAdded.maxOnHeapMem) ~
+ ("Maximum Offheap Memory" -> blockManagerAdded.maxOffHeapMem)
}
def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
@@ -612,7 +614,9 @@ private[spark] object JsonProtocol {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
- SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
+ val maxOnHeapMem = Utils.jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
+ val maxOffHeapMem = Utils.jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+ SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
}
def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {