aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2017-04-06 13:23:54 -0500
committerImran Rashid <irashid@cloudera.com>2017-04-06 13:23:54 -0500
commita4491626ed8169f0162a0dfb78736c9b9e7fb434 (patch)
treec18ac0eeba2cb1e95ef6a6a2dee2a9c16ec31174 /core
parent5a693b4138d4ce948e3bcdbe28d5c01d5deb8fa9 (diff)
downloadspark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.tar.gz
spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.tar.bz2
spark-a4491626ed8169f0162a0dfb78736c9b9e7fb434.zip
[SPARK-17019][CORE] Expose on-heap and off-heap memory usage in various places
## What changes were proposed in this pull request? With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992), Spark supports persisting data into off-heap memory, but the usage of on-heap and off-heap memory is not exposed currently, it is not so convenient for user to monitor and profile, so here propose to expose off-heap memory as well as on-heap memory usage in various places: 1. Spark UI's executor page will display both on-heap and off-heap memory usage. 2. REST request returns both on-heap and off-heap memory. 3. Also this can be gotten from MetricsSystem. 4. Last this usage can be obtained programmatically from SparkListener. Attach the UI changes: ![screen shot 2016-08-12 at 11 20 44 am](https://cloud.githubusercontent.com/assets/850797/17612032/6c2f4480-607f-11e6-82e8-a27fb8cbb4ae.png) Backward compatibility is also considered for event-log and REST API. Old event log can still be replayed with off-heap usage displayed as 0. For REST API, only adds the new fields, so JSON backward compatibility can still be kept. ## How was this patch tested? Unit test added and manual verification. Author: jerryshao <sshao@hortonworks.com> Closes #14617 from jerryshao/SPARK-17019.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html18
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/executorspage.js103
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/webui.css3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala66
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala99
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala8
-rw-r--r--core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json139
-rw-r--r--core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json41
-rwxr-xr-xcore/src/test/resources/spark-events/app-20161116163331-000010
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala87
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala36
22 files changed, 628 insertions, 118 deletions
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 4e83d6d564..5c91304e49 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -24,7 +24,15 @@ limitations under the License.
<th></th>
<th>RDD Blocks</th>
<th><span data-toggle="tooltip"
- title="Memory used / total available memory for storage of data like RDD partitions cached in memory. ">Storage Memory</span>
+ title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">Storage Memory</span>
+ </th>
+ <th class="on_heap_memory">
+ <span data-toggle="tooltip"
+ title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">On Heap Storage Memory</span>
+ </th>
+ <th class="off_heap_memory">
+ <span data-toggle="tooltip"
+ title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">Off Heap Storage Memory</span>
</th>
<th>Disk Used</th>
<th>Cores</th>
@@ -73,6 +81,14 @@ limitations under the License.
<span data-toggle="tooltip" data-placement="top"
title="Memory used / total available memory for storage of data like RDD partitions cached in memory.">
Storage Memory</span></th>
+ <th class="on_heap_memory">
+ <span data-toggle="tooltip" data-placement="top"
+ title="Memory used / total available memory for on heap storage of data like RDD partitions cached in memory.">
+ On Heap Storage Memory</span></th>
+ <th class="off_heap_memory">
+ <span data-toggle="tooltip"
+ title="Memory used / total available memory for off heap storage of data like RDD partitions cached in memory.">
+ Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index 7dbfe32de9..930a069892 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -190,6 +190,10 @@ $(document).ready(function () {
var allRDDBlocks = 0;
var allMemoryUsed = 0;
var allMaxMemory = 0;
+ var allOnHeapMemoryUsed = 0;
+ var allOnHeapMaxMemory = 0;
+ var allOffHeapMemoryUsed = 0;
+ var allOffHeapMaxMemory = 0;
var allDiskUsed = 0;
var allTotalCores = 0;
var allMaxTasks = 0;
@@ -208,6 +212,10 @@ $(document).ready(function () {
var activeRDDBlocks = 0;
var activeMemoryUsed = 0;
var activeMaxMemory = 0;
+ var activeOnHeapMemoryUsed = 0;
+ var activeOnHeapMaxMemory = 0;
+ var activeOffHeapMemoryUsed = 0;
+ var activeOffHeapMaxMemory = 0;
var activeDiskUsed = 0;
var activeTotalCores = 0;
var activeMaxTasks = 0;
@@ -226,6 +234,10 @@ $(document).ready(function () {
var deadRDDBlocks = 0;
var deadMemoryUsed = 0;
var deadMaxMemory = 0;
+ var deadOnHeapMemoryUsed = 0;
+ var deadOnHeapMaxMemory = 0;
+ var deadOffHeapMemoryUsed = 0;
+ var deadOffHeapMaxMemory = 0;
var deadDiskUsed = 0;
var deadTotalCores = 0;
var deadMaxTasks = 0;
@@ -241,10 +253,21 @@ $(document).ready(function () {
var deadTotalBlacklisted = 0;
response.forEach(function (exec) {
+ exec.onHeapMemoryUsed = exec.hasOwnProperty('onHeapMemoryUsed') ? exec.onHeapMemoryUsed : 0;
+ exec.maxOnHeapMemory = exec.hasOwnProperty('maxOnHeapMemory') ? exec.maxOnHeapMemory : 0;
+ exec.offHeapMemoryUsed = exec.hasOwnProperty('offHeapMemoryUsed') ? exec.offHeapMemoryUsed : 0;
+ exec.maxOffHeapMemory = exec.hasOwnProperty('maxOffHeapMemory') ? exec.maxOffHeapMemory : 0;
+ });
+
+ response.forEach(function (exec) {
allExecCnt += 1;
allRDDBlocks += exec.rddBlocks;
allMemoryUsed += exec.memoryUsed;
allMaxMemory += exec.maxMemory;
+ allOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+ allOnHeapMaxMemory += exec.maxOnHeapMemory;
+ allOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+ allOffHeapMaxMemory += exec.maxOffHeapMemory;
allDiskUsed += exec.diskUsed;
allTotalCores += exec.totalCores;
allMaxTasks += exec.maxTasks;
@@ -263,6 +286,10 @@ $(document).ready(function () {
activeRDDBlocks += exec.rddBlocks;
activeMemoryUsed += exec.memoryUsed;
activeMaxMemory += exec.maxMemory;
+ activeOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+ activeOnHeapMaxMemory += exec.maxOnHeapMemory;
+ activeOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+ activeOffHeapMaxMemory += exec.maxOffHeapMemory;
activeDiskUsed += exec.diskUsed;
activeTotalCores += exec.totalCores;
activeMaxTasks += exec.maxTasks;
@@ -281,6 +308,10 @@ $(document).ready(function () {
deadRDDBlocks += exec.rddBlocks;
deadMemoryUsed += exec.memoryUsed;
deadMaxMemory += exec.maxMemory;
+ deadOnHeapMemoryUsed += exec.onHeapMemoryUsed;
+ deadOnHeapMaxMemory += exec.maxOnHeapMemory;
+ deadOffHeapMemoryUsed += exec.offHeapMemoryUsed;
+ deadOffHeapMaxMemory += exec.maxOffHeapMemory;
deadDiskUsed += exec.diskUsed;
deadTotalCores += exec.totalCores;
deadMaxTasks += exec.maxTasks;
@@ -302,6 +333,10 @@ $(document).ready(function () {
"allRDDBlocks": allRDDBlocks,
"allMemoryUsed": allMemoryUsed,
"allMaxMemory": allMaxMemory,
+ "allOnHeapMemoryUsed": allOnHeapMemoryUsed,
+ "allOnHeapMaxMemory": allOnHeapMaxMemory,
+ "allOffHeapMemoryUsed": allOffHeapMemoryUsed,
+ "allOffHeapMaxMemory": allOffHeapMaxMemory,
"allDiskUsed": allDiskUsed,
"allTotalCores": allTotalCores,
"allMaxTasks": allMaxTasks,
@@ -321,6 +356,10 @@ $(document).ready(function () {
"allRDDBlocks": activeRDDBlocks,
"allMemoryUsed": activeMemoryUsed,
"allMaxMemory": activeMaxMemory,
+ "allOnHeapMemoryUsed": activeOnHeapMemoryUsed,
+ "allOnHeapMaxMemory": activeOnHeapMaxMemory,
+ "allOffHeapMemoryUsed": activeOffHeapMemoryUsed,
+ "allOffHeapMaxMemory": activeOffHeapMaxMemory,
"allDiskUsed": activeDiskUsed,
"allTotalCores": activeTotalCores,
"allMaxTasks": activeMaxTasks,
@@ -340,6 +379,10 @@ $(document).ready(function () {
"allRDDBlocks": deadRDDBlocks,
"allMemoryUsed": deadMemoryUsed,
"allMaxMemory": deadMaxMemory,
+ "allOnHeapMemoryUsed": deadOnHeapMemoryUsed,
+ "allOnHeapMaxMemory": deadOnHeapMaxMemory,
+ "allOffHeapMemoryUsed": deadOffHeapMemoryUsed,
+ "allOffHeapMaxMemory": deadOffHeapMaxMemory,
"allDiskUsed": deadDiskUsed,
"allTotalCores": deadTotalCores,
"allMaxTasks": deadMaxTasks,
@@ -378,7 +421,35 @@ $(document).ready(function () {
{data: 'rddBlocks'},
{
data: function (row, type) {
- return type === 'display' ? (formatBytes(row.memoryUsed, type) + ' / ' + formatBytes(row.maxMemory, type)) : row.memoryUsed;
+ if (type !== 'display')
+ return row.memoryUsed;
+ else
+ return (formatBytes(row.memoryUsed, type) + ' / ' +
+ formatBytes(row.maxMemory, type));
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.onHeapMemoryUsed;
+ else
+ return (formatBytes(row.onHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.maxOnHeapMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('on_heap_memory')
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.offHeapMemoryUsed;
+ else
+ return (formatBytes(row.offHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.maxOffHeapMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('off_heap_memory')
}
},
{data: 'diskUsed', render: formatBytes},
@@ -450,7 +521,35 @@ $(document).ready(function () {
{data: 'allRDDBlocks'},
{
data: function (row, type) {
- return type === 'display' ? (formatBytes(row.allMemoryUsed, type) + ' / ' + formatBytes(row.allMaxMemory, type)) : row.allMemoryUsed;
+ if (type !== 'display')
+ return row.allMemoryUsed
+ else
+ return (formatBytes(row.allMemoryUsed, type) + ' / ' +
+ formatBytes(row.allMaxMemory, type));
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.allOnHeapMemoryUsed;
+ else
+ return (formatBytes(row.allOnHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.allOnHeapMaxMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('on_heap_memory')
+ }
+ },
+ {
+ data: function (row, type) {
+ if (type !== 'display')
+ return row.allOffHeapMemoryUsed;
+ else
+ return (formatBytes(row.allOffHeapMemoryUsed, type) + ' / ' +
+ formatBytes(row.allOffHeapMaxMemory, type));
+ },
+ "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
+ $(nTd).addClass('off_heap_memory')
}
},
{data: 'allDiskUsed', render: formatBytes},
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 319a719efa..935d9b1aec 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -205,7 +205,8 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
-.serialization_time, .getting_result_time, .peak_execution_memory {
+.serialization_time, .getting_result_time, .peak_execution_memory,
+.on_heap_memory, .off_heap_memory {
display: none;
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 4331addb44..bc2e530716 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -87,8 +87,13 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
- extends SparkListenerEvent
+case class SparkListenerBlockManagerAdded(
+ time: Long,
+ blockManagerId: BlockManagerId,
+ maxMem: Long,
+ maxOnHeapMem: Option[Long] = None,
+ maxOffHeapMem: Option[Long] = None) extends SparkListenerEvent {
+}
@DeveloperApi
case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
index 5c03609e5e..1279b281ad 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala
@@ -70,7 +70,13 @@ private[spark] object AllRDDResource {
address = status.blockManagerId.hostPort,
memoryUsed = status.memUsedByRdd(rddId),
memoryRemaining = status.memRemaining,
- diskUsed = status.diskUsedByRdd(rddId)
+ diskUsed = status.diskUsedByRdd(rddId),
+ onHeapMemoryUsed = Some(
+ if (!rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+ offHeapMemoryUsed = Some(
+ if (rddInfo.storageLevel.useOffHeap) status.memUsedByRdd(rddId) else 0L),
+ onHeapMemoryRemaining = status.onHeapMemRemaining,
+ offHeapMemoryRemaining = status.offHeapMemRemaining
) } )
} else {
None
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5b9227350e..d159b9450e 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -75,7 +75,11 @@ class ExecutorSummary private[spark](
val totalShuffleWrite: Long,
val isBlacklisted: Boolean,
val maxMemory: Long,
- val executorLogs: Map[String, String])
+ val executorLogs: Map[String, String],
+ val onHeapMemoryUsed: Option[Long],
+ val offHeapMemoryUsed: Option[Long],
+ val maxOnHeapMemory: Option[Long],
+ val maxOffHeapMemory: Option[Long])
class JobData private[spark](
val jobId: Int,
@@ -111,7 +115,11 @@ class RDDDataDistribution private[spark](
val address: String,
val memoryUsed: Long,
val memoryRemaining: Long,
- val diskUsed: Long)
+ val diskUsed: Long,
+ val onHeapMemoryUsed: Option[Long],
+ val offHeapMemoryUsed: Option[Long],
+ val onHeapMemoryRemaining: Option[Long],
+ val offHeapMemoryRemaining: Option[Long])
class RDDPartitionInfo private[spark](
val blockName: String,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 46a078b2f9..63acba65d3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -150,8 +150,8 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
- private val maxMemory =
- memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
+ private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
+ private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
@@ -229,7 +229,8 @@ private[spark] class BlockManager(
val idFromMaster = master.registerBlockManager(
id,
- maxMemory,
+ maxOnHeapMemory,
+ maxOffHeapMemory,
slaveEndpoint)
blockManagerId = if (idFromMaster != null) idFromMaster else id
@@ -307,7 +308,7 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo(s"BlockManager $blockManagerId re-registering with master")
- master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
+ master.registerBlockManager(blockManagerId, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint)
reportAllBlocks()
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 3ca690db9e..ea5d8423a5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -57,11 +57,12 @@ class BlockManagerMaster(
*/
def registerBlockManager(
blockManagerId: BlockManagerId,
- maxMemSize: Long,
+ maxOnHeapMemSize: Long,
+ maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askSync[BlockManagerId](
- RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
+ RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 84c04d2260..467c3e0e6b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -71,8 +71,8 @@ class BlockManagerMasterEndpoint(
logInfo("BlockManagerMasterEndpoint up")
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
- context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
+ case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
+ context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
@@ -276,7 +276,8 @@ class BlockManagerMasterEndpoint(
private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case (blockManagerId, info) =>
- new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala)
+ new StorageStatus(blockManagerId, info.maxMem, Some(info.maxOnHeapMem),
+ Some(info.maxOffHeapMem), info.blocks.asScala)
}.toArray
}
@@ -338,7 +339,8 @@ class BlockManagerMasterEndpoint(
*/
private def register(
idWithoutTopologyInfo: BlockManagerId,
- maxMemSize: Long,
+ maxOnHeapMemSize: Long,
+ maxOffHeapMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId = {
// the dummy id is not expected to contain the topology information.
// we get that info here and respond back with a more fleshed out block manager id
@@ -359,14 +361,15 @@ class BlockManagerMasterEndpoint(
case None =>
}
logInfo("Registering block manager %s with %s RAM, %s".format(
- id.hostPort, Utils.bytesToString(maxMemSize), id))
+ id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))
blockManagerIdByExecutor(id.executorId) = id
blockManagerInfo(id) = new BlockManagerInfo(
- id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
+ id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
}
- listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
+ listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
+ Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
id
}
@@ -464,10 +467,13 @@ object BlockStatus {
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
- val maxMem: Long,
+ val maxOnHeapMem: Long,
+ val maxOffHeapMem: Long,
val slaveEndpoint: RpcEndpointRef)
extends Logging {
+ val maxMem = maxOnHeapMem + maxOffHeapMem
+
private var _lastSeenMs: Long = timeMs
private var _remainingMem: Long = maxMem
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 0aea438e7f..0c0ff14459 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -58,7 +58,8 @@ private[spark] object BlockManagerMessages {
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
- maxMemSize: Long,
+ maxOnHeapMemSize: Long,
+ maxOffHeapMemSize: Long,
sender: RpcEndpointRef)
extends ToBlockManagerMaster
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index c5ba9af3e2..197a01762c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -26,35 +26,39 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager)
override val metricRegistry = new MetricRegistry()
override val sourceName = "BlockManager"
- metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).sum
- maxMem / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).sum
- remainingMem / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val memUsed = storageStatusList.map(_.memUsed).sum
- memUsed / 1024 / 1024
- }
- })
-
- metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
- override def getValue: Long = {
- val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
- diskSpaceUsed / 1024 / 1024
- }
- })
+ private def registerGauge(name: String, func: BlockManagerMaster => Long): Unit = {
+ metricRegistry.register(name, new Gauge[Long] {
+ override def getValue: Long = func(blockManager.master) / 1024 / 1024
+ })
+ }
+
+ registerGauge(MetricRegistry.name("memory", "maxMem_MB"),
+ _.getStorageStatus.map(_.maxMem).sum)
+
+ registerGauge(MetricRegistry.name("memory", "maxOnHeapMem_MB"),
+ _.getStorageStatus.map(_.maxOnHeapMem.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "maxOffHeapMem_MB"),
+ _.getStorageStatus.map(_.maxOffHeapMem.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "remainingMem_MB"),
+ _.getStorageStatus.map(_.memRemaining).sum)
+
+ registerGauge(MetricRegistry.name("memory", "remainingOnHeapMem_MB"),
+ _.getStorageStatus.map(_.onHeapMemRemaining.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "remainingOffHeapMem_MB"),
+ _.getStorageStatus.map(_.offHeapMemRemaining.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "memUsed_MB"),
+ _.getStorageStatus.map(_.memUsed).sum)
+
+ registerGauge(MetricRegistry.name("memory", "onHeapMemUsed_MB"),
+ _.getStorageStatus.map(_.onHeapMemUsed.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("memory", "offHeapMemUsed_MB"),
+ _.getStorageStatus.map(_.offHeapMemUsed.getOrElse(0L)).sum)
+
+ registerGauge(MetricRegistry.name("disk", "diskSpaceUsed_MB"),
+ _.getStorageStatus.map(_.diskUsed).sum)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 798658a15b..1b30d4fa93 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -41,7 +41,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
def deadStorageStatusList: Seq[StorageStatus] = synchronized {
- deadExecutorStorageStatus.toSeq
+ deadExecutorStorageStatus
}
/** Update storage status list to reflect updated block statuses */
@@ -74,8 +74,10 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
synchronized {
val blockManagerId = blockManagerAdded.blockManagerId
val executorId = blockManagerId.executorId
- val maxMem = blockManagerAdded.maxMem
- val storageStatus = new StorageStatus(blockManagerId, maxMem)
+ // The onHeap and offHeap memory are always defined for new applications,
+ // but they can be missing if we are replaying old event logs.
+ val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem,
+ blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem)
executorIdToStorageStatus(executorId) = storageStatus
// Try to remove the dead storage status if same executor register the block manager twice.
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 241aacd74b..8f0d181fc8 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -35,7 +35,11 @@ import org.apache.spark.internal.Logging
* class cannot mutate the source of the information. Accesses are not thread-safe.
*/
@DeveloperApi
-class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
+class StorageStatus(
+ val blockManagerId: BlockManagerId,
+ val maxMemory: Long,
+ val maxOnHeapMem: Option[Long],
+ val maxOffHeapMem: Option[Long]) {
/**
* Internal representation of the blocks stored in this block manager.
@@ -46,25 +50,21 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
- /**
- * Storage information of the blocks that entails memory, disk, and off-heap memory usage.
- *
- * As with the block maps, we store the storage information separately for RDD blocks and
- * non-RDD blocks for the same reason. In particular, RDD storage information is stored
- * in a map indexed by the RDD ID to the following 4-tuple:
- *
- * (memory size, disk size, storage level)
- *
- * We assume that all the blocks that belong to the same RDD have the same storage level.
- * This field is not relevant to non-RDD blocks, however, so the storage information for
- * non-RDD blocks contains only the first 3 fields (in the same order).
- */
- private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
- private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
+ private case class RddStorageInfo(memoryUsage: Long, diskUsage: Long, level: StorageLevel)
+ private val _rddStorageInfo = new mutable.HashMap[Int, RddStorageInfo]
+
+ private case class NonRddStorageInfo(var onHeapUsage: Long, var offHeapUsage: Long,
+ var diskUsage: Long)
+ private val _nonRddStorageInfo = NonRddStorageInfo(0L, 0L, 0L)
/** Create a storage status with an initial set of blocks, leaving the source unmodified. */
- def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
- this(bmid, maxMem)
+ def this(
+ bmid: BlockManagerId,
+ maxMemory: Long,
+ maxOnHeapMem: Option[Long],
+ maxOffHeapMem: Option[Long],
+ initialBlocks: Map[BlockId, BlockStatus]) {
+ this(bmid, maxMemory, maxOnHeapMem, maxOffHeapMem)
initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
}
@@ -176,26 +176,57 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
*/
def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
+ /** Return the max memory can be used by this block manager. */
+ def maxMem: Long = maxMemory
+
/** Return the memory remaining in this block manager. */
def memRemaining: Long = maxMem - memUsed
+ /** Return the memory used by caching RDDs */
+ def cacheSize: Long = onHeapCacheSize.getOrElse(0L) + offHeapCacheSize.getOrElse(0L)
+
/** Return the memory used by this block manager. */
- def memUsed: Long = _nonRddStorageInfo._1 + cacheSize
+ def memUsed: Long = onHeapMemUsed.getOrElse(0L) + offHeapMemUsed.getOrElse(0L)
- /** Return the memory used by caching RDDs */
- def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+ /** Return the on-heap memory remaining in this block manager. */
+ def onHeapMemRemaining: Option[Long] =
+ for (m <- maxOnHeapMem; o <- onHeapMemUsed) yield m - o
+
+ /** Return the off-heap memory remaining in this block manager. */
+ def offHeapMemRemaining: Option[Long] =
+ for (m <- maxOffHeapMem; o <- offHeapMemUsed) yield m - o
+
+ /** Return the on-heap memory used by this block manager. */
+ def onHeapMemUsed: Option[Long] = onHeapCacheSize.map(_ + _nonRddStorageInfo.onHeapUsage)
+
+ /** Return the off-heap memory used by this block manager. */
+ def offHeapMemUsed: Option[Long] = offHeapCacheSize.map(_ + _nonRddStorageInfo.offHeapUsage)
+
+ /** Return the memory used by on-heap caching RDDs */
+ def onHeapCacheSize: Option[Long] = maxOnHeapMem.map { _ =>
+ _rddStorageInfo.collect {
+ case (_, storageInfo) if !storageInfo.level.useOffHeap => storageInfo.memoryUsage
+ }.sum
+ }
+
+ /** Return the memory used by off-heap caching RDDs */
+ def offHeapCacheSize: Option[Long] = maxOffHeapMem.map { _ =>
+ _rddStorageInfo.collect {
+ case (_, storageInfo) if storageInfo.level.useOffHeap => storageInfo.memoryUsage
+ }.sum
+ }
/** Return the disk space used by this block manager. */
- def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
+ def diskUsed: Long = _nonRddStorageInfo.diskUsage + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
/** Return the memory used by the given RDD in this block manager in O(1) time. */
- def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
+ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.memoryUsage).getOrElse(0L)
/** Return the disk space used by the given RDD in this block manager in O(1) time. */
- def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
+ def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.diskUsage).getOrElse(0L)
/** Return the storage level, if any, used by the given RDD in this block manager. */
- def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
+ def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_.level)
/**
* Update the relevant storage info, taking into account any existing status for this block.
@@ -210,10 +241,12 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
val (oldMem, oldDisk) = blockId match {
case RDDBlockId(rddId, _) =>
_rddStorageInfo.get(rddId)
- .map { case (mem, disk, _) => (mem, disk) }
+ .map { case RddStorageInfo(mem, disk, _) => (mem, disk) }
.getOrElse((0L, 0L))
- case _ =>
- _nonRddStorageInfo
+ case _ if !level.useOffHeap =>
+ (_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
+ case _ if level.useOffHeap =>
+ (_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
}
val newMem = math.max(oldMem + changeInMem, 0L)
val newDisk = math.max(oldDisk + changeInDisk, 0L)
@@ -225,13 +258,17 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
if (newMem + newDisk == 0) {
_rddStorageInfo.remove(rddId)
} else {
- _rddStorageInfo(rddId) = (newMem, newDisk, level)
+ _rddStorageInfo(rddId) = RddStorageInfo(newMem, newDisk, level)
}
case _ =>
- _nonRddStorageInfo = (newMem, newDisk)
+ if (!level.useOffHeap) {
+ _nonRddStorageInfo.onHeapUsage = newMem
+ } else {
+ _nonRddStorageInfo.offHeapUsage = newMem
+ }
+ _nonRddStorageInfo.diskUsage = newDisk
}
}
-
}
/** Helper methods for storage-related objects. */
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index d849ce76a9..0a3c63d14c 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -40,7 +40,8 @@ private[ui] case class ExecutorSummaryInfo(
totalShuffleRead: Long,
totalShuffleWrite: Long,
isBlacklisted: Int,
- maxMemory: Long,
+ maxOnHeapMem: Long,
+ maxOffHeapMem: Long,
executorLogs: Map[String, String])
@@ -53,6 +54,34 @@ private[ui] class ExecutorsPage(
val content =
<div>
{
+ <div>
+ <span class="expand-additional-metrics">
+ <span class="expand-additional-metrics-arrow arrow-closed"></span>
+ <a>Show Additional Metrics</a>
+ </span>
+ <div class="additional-metrics collapsed">
+ <ul>
+ <li>
+ <input type="checkbox" id="select-all-metrics"/>
+ <span class="additional-metric-title"><em>(De)select All</em></span>
+ </li>
+ <li>
+ <span data-toggle="tooltip"
+ title={ExecutorsPage.ON_HEAP_MEMORY_TOOLTIP} data-placement="right">
+ <input type="checkbox" name="on_heap_memory"/>
+ <span class="additional-metric-title">On Heap Storage Memory</span>
+ </span>
+ </li>
+ <li>
+ <span data-toggle="tooltip"
+ title={ExecutorsPage.OFF_HEAP_MEMORY_TOOLTIP} data-placement="right">
+ <input type="checkbox" name="off_heap_memory"/>
+ <span class="additional-metric-title">Off Heap Storage Memory</span>
+ </span>
+ </li>
+ </ul>
+ </div>
+ </div> ++
<div id="active-executors"></div> ++
<script src={UIUtils.prependBaseUri("/static/utils.js")}></script> ++
<script src={UIUtils.prependBaseUri("/static/executorspage.js")}></script> ++
@@ -65,6 +94,11 @@ private[ui] class ExecutorsPage(
}
private[spark] object ExecutorsPage {
+ private val ON_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for on heap " +
+ "storage of data like RDD partitions cached in memory."
+ private val OFF_HEAP_MEMORY_TOOLTIP = "Memory used / total available memory for off heap " +
+ "storage of data like RDD partitions cached in memory."
+
/** Represent an executor's info as a map given a storage status index */
def getExecInfo(
listener: ExecutorsListener,
@@ -80,6 +114,10 @@ private[spark] object ExecutorsPage {
val rddBlocks = status.numBlocks
val memUsed = status.memUsed
val maxMem = status.maxMem
+ val onHeapMemUsed = status.onHeapMemUsed
+ val offHeapMemUsed = status.offHeapMemUsed
+ val maxOnHeapMem = status.maxOnHeapMem
+ val maxOffHeapMem = status.maxOffHeapMem
val diskUsed = status.diskUsed
val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId))
@@ -103,7 +141,11 @@ private[spark] object ExecutorsPage {
taskSummary.shuffleWrite,
taskSummary.isBlacklisted,
maxMem,
- taskSummary.executorLogs
+ taskSummary.executorLogs,
+ onHeapMemUsed,
+ offHeapMemUsed,
+ maxOnHeapMem,
+ maxOffHeapMem
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 227e940c9c..a1a0c729b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -147,7 +147,8 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
/** Header fields for the worker table */
private def workerHeader = Seq(
"Host",
- "Memory Usage",
+ "On Heap Memory Usage",
+ "Off Heap Memory Usage",
"Disk Usage")
/** Render an HTML row representing a worker */
@@ -155,8 +156,12 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
<tr>
<td>{worker.address}</td>
<td>
- {Utils.bytesToString(worker.memoryUsed)}
- ({Utils.bytesToString(worker.memoryRemaining)} Remaining)
+ {Utils.bytesToString(worker.onHeapMemoryUsed.getOrElse(0L))}
+ ({Utils.bytesToString(worker.onHeapMemoryRemaining.getOrElse(0L))} Remaining)
+ </td>
+ <td>
+ {Utils.bytesToString(worker.offHeapMemoryUsed.getOrElse(0L))}
+ ({Utils.bytesToString(worker.offHeapMemoryRemaining.getOrElse(0L))} Remaining)
</td>
<td>{Utils.bytesToString(worker.diskUsed)}</td>
</tr>
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1d2cb7acef..8296c42942 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -182,7 +182,9 @@ private[spark] object JsonProtocol {
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
("Block Manager ID" -> blockManagerId) ~
("Maximum Memory" -> blockManagerAdded.maxMem) ~
- ("Timestamp" -> blockManagerAdded.time)
+ ("Timestamp" -> blockManagerAdded.time) ~
+ ("Maximum Onheap Memory" -> blockManagerAdded.maxOnHeapMem) ~
+ ("Maximum Offheap Memory" -> blockManagerAdded.maxOffHeapMem)
}
def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
@@ -612,7 +614,9 @@ private[spark] object JsonProtocol {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
- SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
+ val maxOnHeapMem = Utils.jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
+ val maxOffHeapMem = Utils.jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+ SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
}
def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
new file mode 100644
index 0000000000..e732af2663
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -0,0 +1,139 @@
+[ {
+ "id" : "2",
+ "hostPort" : "172.22.0.167:51487",
+ "isActive" : true,
+ "rddBlocks" : 0,
+ "memoryUsed" : 0,
+ "diskUsed" : 0,
+ "totalCores" : 4,
+ "maxTasks" : 4,
+ "activeTasks" : 0,
+ "failedTasks" : 4,
+ "completedTasks" : 0,
+ "totalTasks" : 4,
+ "totalDuration" : 2537,
+ "totalGCTime" : 88,
+ "totalInputBytes" : 0,
+ "totalShuffleRead" : 0,
+ "totalShuffleWrite" : 0,
+ "isBlacklisted" : true,
+ "maxMemory" : 908381388,
+ "executorLogs" : {
+ "stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout",
+ "stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"
+ },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
+}, {
+ "id" : "driver",
+ "hostPort" : "172.22.0.167:51475",
+ "isActive" : true,
+ "rddBlocks" : 0,
+ "memoryUsed" : 0,
+ "diskUsed" : 0,
+ "totalCores" : 0,
+ "maxTasks" : 0,
+ "activeTasks" : 0,
+ "failedTasks" : 0,
+ "completedTasks" : 0,
+ "totalTasks" : 0,
+ "totalDuration" : 0,
+ "totalGCTime" : 0,
+ "totalInputBytes" : 0,
+ "totalShuffleRead" : 0,
+ "totalShuffleWrite" : 0,
+ "isBlacklisted" : true,
+ "maxMemory" : 908381388,
+ "executorLogs" : { },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
+}, {
+ "id" : "1",
+ "hostPort" : "172.22.0.167:51490",
+ "isActive" : true,
+ "rddBlocks" : 0,
+ "memoryUsed" : 0,
+ "diskUsed" : 0,
+ "totalCores" : 4,
+ "maxTasks" : 4,
+ "activeTasks" : 0,
+ "failedTasks" : 0,
+ "completedTasks" : 4,
+ "totalTasks" : 4,
+ "totalDuration" : 3152,
+ "totalGCTime" : 68,
+ "totalInputBytes" : 0,
+ "totalShuffleRead" : 0,
+ "totalShuffleWrite" : 0,
+ "isBlacklisted" : true,
+ "maxMemory" : 908381388,
+ "executorLogs" : {
+ "stdout" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout",
+ "stderr" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"
+ },
+
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
+}, {
+ "id" : "0",
+ "hostPort" : "172.22.0.167:51491",
+ "isActive" : true,
+ "rddBlocks" : 0,
+ "memoryUsed" : 0,
+ "diskUsed" : 0,
+ "totalCores" : 4,
+ "maxTasks" : 4,
+ "activeTasks" : 0,
+ "failedTasks" : 4,
+ "completedTasks" : 0,
+ "totalTasks" : 4,
+ "totalDuration" : 2551,
+ "totalGCTime" : 116,
+ "totalInputBytes" : 0,
+ "totalShuffleRead" : 0,
+ "totalShuffleWrite" : 0,
+ "isBlacklisted" : true,
+ "maxMemory" : 908381388,
+ "executorLogs" : {
+ "stdout" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout",
+ "stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"
+ },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
+}, {
+ "id" : "3",
+ "hostPort" : "172.22.0.167:51485",
+ "isActive" : true,
+ "rddBlocks" : 0,
+ "memoryUsed" : 0,
+ "diskUsed" : 0,
+ "totalCores" : 4,
+ "maxTasks" : 4,
+ "activeTasks" : 0,
+ "failedTasks" : 0,
+ "completedTasks" : 12,
+ "totalTasks" : 12,
+ "totalDuration" : 2453,
+ "totalGCTime" : 72,
+ "totalInputBytes" : 0,
+ "totalShuffleRead" : 0,
+ "totalShuffleWrite" : 0,
+ "isBlacklisted" : true,
+ "maxMemory" : 908381388,
+ "executorLogs" : {
+ "stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout",
+ "stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"
+ },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
+} ]
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
index 5914a1c2c4..e732af2663 100644
--- a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
@@ -17,11 +17,15 @@
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : true,
- "maxMemory" : 384093388,
+ "maxMemory" : 908381388,
"executorLogs" : {
"stdout" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout",
"stderr" : "http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"
- }
+ },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
}, {
"id" : "driver",
"hostPort" : "172.22.0.167:51475",
@@ -41,8 +45,12 @@
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : true,
- "maxMemory" : 384093388,
- "executorLogs" : { }
+ "maxMemory" : 908381388,
+ "executorLogs" : { },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
}, {
"id" : "1",
"hostPort" : "172.22.0.167:51490",
@@ -62,11 +70,16 @@
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : true,
- "maxMemory" : 384093388,
+ "maxMemory" : 908381388,
"executorLogs" : {
"stdout" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout",
"stderr" : "http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"
- }
+ },
+
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
}, {
"id" : "0",
"hostPort" : "172.22.0.167:51491",
@@ -86,11 +99,15 @@
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : true,
- "maxMemory" : 384093388,
+ "maxMemory" : 908381388,
"executorLogs" : {
"stdout" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout",
"stderr" : "http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"
- }
+ },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
}, {
"id" : "3",
"hostPort" : "172.22.0.167:51485",
@@ -110,9 +127,13 @@
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : true,
- "maxMemory" : 384093388,
+ "maxMemory" : 908381388,
"executorLogs" : {
"stdout" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout",
"stderr" : "http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"
- }
+ },
+ "onHeapMemoryUsed" : 0,
+ "offHeapMemoryUsed" : 0,
+ "maxOnHeapMemory" : 384093388,
+ "maxOffHeapMemory" : 524288000
} ]
diff --git a/core/src/test/resources/spark-events/app-20161116163331-0000 b/core/src/test/resources/spark-events/app-20161116163331-0000
index 7566c9fc0a..57cfc5b973 100755
--- a/core/src/test/resources/spark-events/app-20161116163331-0000
+++ b/core/src/test/resources/spark-events/app-20161116163331-0000
@@ -1,15 +1,15 @@
{"Event":"SparkListenerLogStart","Spark Version":"2.1.0-SNAPSHOT"}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"172.22.0.167","Port":51475},"Maximum Memory":384093388,"Timestamp":1479335611477}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"172.22.0.167","Port":51475},"Maximum Memory":908381388,"Timestamp":1479335611477,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre","Java Version":"1.8.0_92 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.blacklist.task.maxTaskAttemptsPerExecutor":"3","spark.blacklist.enabled":"TRUE","spark.driver.host":"172.22.0.167","spark.blacklist.task.maxTaskAttemptsPerNode":"3","spark.eventLog.enabled":"TRUE","spark.driver.port":"51459","spark.repl.class.uri":"spark://172.22.0.167:51459/classes","spark.jars":"","spark.repl.class.outputDir":"/private/var/folders/l4/d46wlzj16593f3d812vk49tw0000gp/T/spark-1cbc97d0-7fe6-4c9f-8c2c-f6fe51ee3cf2/repl-39929169-ac4c-4c6d-b116-f648e4dd62ed","spark.app.name":"Spark shell","spark.blacklist.stage.maxFailedExecutorsPerNode":"3","spark.scheduler.mode":"FIFO","spark.eventLog.overwrite":"TRUE","spark.blacklist.stage.maxFailedTasksPerExecutor":"3","spark.executor.id":"driver","spark.blacklist.application.maxFailedExecutorsPerNode":"2","spark.submit.deployMode":"client","spark.master":"local-cluster[4,4,1024]","spark.home":"/Users/Jose/IdeaProjects/spark","spark.eventLog.dir":"/Users/jose/logs","spark.sql.catalogImplementation":"in-memory","spark.eventLog.compress":"FALSE","spark.blacklist.application.maxFailedTasksPerExecutor":"1","spark.blacklist.timeout":"1000000","spark.app.id":"app-20161116163331-0000","spark.task.maxFailures":"4"},"System Properties":{"java.io.tmpdir":"/var/folders/l4/d46wlzj16593f3d812vk49tw0000gp/T/","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"/Users/Jose","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","ftp.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","sun.arch.data.model":"64","sun.boot.library.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib","user.dir":"/Users/Jose/IdeaProjects/spark","java.library.path":"/Users/Jose/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.","sun.cpu.isalist":"","os.arch":"x86_64","java.vm.version":"25.92-b14","java.endorsed.dirs":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/endorsed","java.runtime.version":"1.8.0_92-b14","java.vm.info":"mixed mode","java.ext.dirs":"/Users/Jose/Library/Java/Extensions:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/ext:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","io.netty.maxDirectMemory":"0","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre/classes","file.encoding":"UTF-8","user.timezone":"America/Chicago","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"10.11.6","sun.os.patch.level":"unknown","gopherProxySet":"false","java.vm.specification.vendor":"Oracle Corporation","user.country":"US","sun.jnu.encoding":"UTF-8","http.nonProxyHosts":"local|*.local|169.254/16|*.169.254/16","user.language":"en","socksNonProxyHosts":"local|*.local|169.254/16|*.169.254/16","java.vendor.url":"http://java.oracle.com/","java.awt.printerjob":"sun.lwawt.macosx.CPrinterJob","java.awt.graphicsenv":"sun.awt.CGraphicsEnvironment","awt.toolkit":"sun.lwawt.macosx.LWCToolkit","os.name":"Mac OS X","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"http://bugreport.sun.com/bugreport/","user.name":"jose","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --master local-cluster[4,4,1024] --conf spark.blacklist.enabled=TRUE --conf spark.blacklist.timeout=1000000 --conf spark.blacklist.application.maxFailedTasksPerExecutor=1 --conf spark.eventLog.overwrite=TRUE --conf spark.blacklist.task.maxTaskAttemptsPerNode=3 --conf spark.blacklist.stage.maxFailedTasksPerExecutor=3 --conf spark.blacklist.task.maxTaskAttemptsPerExecutor=3 --conf spark.eventLog.compress=FALSE --conf spark.blacklist.stage.maxFailedExecutorsPerNode=3 --conf spark.eventLog.enabled=TRUE --conf spark.eventLog.dir=/Users/jose/logs --conf spark.blacklist.application.maxFailedExecutorsPerNode=2 --conf spark.task.maxFailures=4 --class org.apache.spark.repl.Main --name Spark shell spark-shell -i /Users/Jose/dev/jose-utils/blacklist/test-blacklist.scala","java.home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre","java.version":"1.8.0_92","sun.io.unicode.encoding":"UnicodeBig"},"Classpath Entries":{"/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-mapred-1.7.7-hadoop2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-core-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-servlet-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-column-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/snappy-java-1.1.2.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/oro-2.0.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/arpack_combined_all-0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pmml-schema-1.2.15.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-assembly_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javassist-3.18.1-GA.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-tags_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-launcher_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-math3-3.4.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hk2-api-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-xml_2.11-1.0.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/objenesis-2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spire-macros_2.11-0.7.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-reflect-2.11.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-mllib-local_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-mllib_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-server-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/core/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-mapper-asl-1.9.13.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-module-scala_2.11-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/curator-framework-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.inject-1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/curator-client-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-core-asl-1.9.13.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/network-common/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/zookeeper-3.4.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-auth-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/repl/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jul-to-slf4j-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-media-jaxb-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-io-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/RoaringBitmap-0.5.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.ws.rs-api-2.0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/catalyst/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-unsafe_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-repl_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-continuation-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-client-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/hive-thriftserver/target/scala-2.11/classes":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-annotations-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-graphite-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-api-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-container-servlet-core-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/streaming/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-net-3.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-proxy-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-catalyst_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/lz4-1.3.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-crypto-1.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/network-yarn/target/scala-2.11/classes":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.annotation-api-1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-sql_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/guava-14.0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.servlet-api-3.1.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-collections-3.2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/conf/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/unused-1.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/aopalliance-1.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-encoding-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/tags/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/json4s-jackson_2.11-3.2.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-cli-1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-server-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/cglib-2.2.1-v20090111.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pyrolite-4.13.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-library-2.11.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-parser-combinators_2.11-1.0.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-util-6.1.26.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/py4j-0.10.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-configuration-1.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/core-1.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/core/target/jars/*":"System Classpath","/Users/Jose/IdeaProjects/spark/common/network-shuffle/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-format-2.3.0-incubating.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/kryo-shaded-3.0.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/core/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/chill-java-0.8.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-annotations-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-hadoop-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/sql/hive/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-ipc-1.7.7.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/xz-1.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-jackson-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/aopalliance-repackaged-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-common-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/log4j-1.2.17.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-core-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-util-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scalap-2.11.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/osgi-resource-locator-1.0.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-beanutils-1.7.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-compress-1.4.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jcl-over-slf4j-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/yarn/target/scala-2.11/classes":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-plus-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/protobuf-java-2.5.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/unsafe/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-module-paranamer-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/leveldbjni-all-1.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-core-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/slf4j-api-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/compress-lzf-1.0.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/stream-2.7.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-shuffle-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-codec-1.10.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-yarn-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/common/sketch/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/breeze_2.11-0.12.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-core_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-container-servlet-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-network-shuffle_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-lang-2.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/ivy-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-common-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-math-2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-hdfs-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scala-compiler-2.11.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-jvm-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-lang3-3.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jsr305-1.3.9.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/minlog-1.3.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/netty-3.8.0.Final.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-webapp-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/json4s-ast_2.11-3.2.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/xbean-asm5-shaded-4.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-io-2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/slf4j-log4j12-1.7.16.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hk2-locator-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/shapeless_2.11-2.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-network-common_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-xml-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-httpclient-3.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/javax.inject-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/mllib/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/scalatest_2.11-2.2.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hk2-utils-2.4.0-b34.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-client-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-guava-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-jndi-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/graphx/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-app-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/examples/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/xmlenc-0.52.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jets3t-0.7.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/curator-recipes-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/opencsv-2.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jtransforms-2.4.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/antlr4-runtime-4.5.3.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/chill_2.11-0.8.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-digester-1.8.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/univocity-parsers-2.2.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jline-2.12.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-streaming_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/launcher/target/scala-2.11/classes/":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/breeze-macros_2.11-0.12.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jersey-client-2.22.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jackson-databind-2.6.5.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-servlets-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/paranamer-2.6.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-security-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-ipc-1.7.7-tests.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/avro-1.7.7.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spire_2.11-0.7.4.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-client-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/metrics-json-3.1.2.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-beanutils-core-1.8.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/validation-api-1.1.0.Final.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-graphx_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/netty-all-4.0.41.Final.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/janino-3.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/json4s-core_2.11-3.2.11.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/commons-compiler-3.0.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/guice-3.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-server-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/jetty-http-9.2.16.v20160414.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/parquet-common-1.8.1.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-jobclient-2.2.0.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-sketch_2.11-2.1.0-SNAPSHOT.jar":"System Classpath","/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pmml-model-1.2.15.jar":"System Classpath"}}
{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"app-20161116163331-0000","Timestamp":1479335609916,"User":"jose"}
{"Event":"SparkListenerExecutorAdded","Timestamp":1479335615320,"Executor ID":"3","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout","stderr":"http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"}}}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"3","Host":"172.22.0.167","Port":51485},"Maximum Memory":384093388,"Timestamp":1479335615387}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"3","Host":"172.22.0.167","Port":51485},"Maximum Memory":908381388,"Timestamp":1479335615387,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
{"Event":"SparkListenerExecutorAdded","Timestamp":1479335615393,"Executor ID":"2","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stdout","stderr":"http://172.22.0.167:51469/logPage/?appId=app-20161116163331-0000&executorId=2&logType=stderr"}}}
{"Event":"SparkListenerExecutorAdded","Timestamp":1479335615443,"Executor ID":"1","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stdout","stderr":"http://172.22.0.167:51467/logPage/?appId=app-20161116163331-0000&executorId=1&logType=stderr"}}}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":"172.22.0.167","Port":51487},"Maximum Memory":384093388,"Timestamp":1479335615448}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":"172.22.0.167","Port":51487},"Maximum Memory":908381388,"Timestamp":1479335615448,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
{"Event":"SparkListenerExecutorAdded","Timestamp":1479335615462,"Executor ID":"0","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stdout","stderr":"http://172.22.0.167:51465/logPage/?appId=app-20161116163331-0000&executorId=0&logType=stderr"}}}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"172.22.0.167","Port":51490},"Maximum Memory":384093388,"Timestamp":1479335615496}
-{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"0","Host":"172.22.0.167","Port":51491},"Maximum Memory":384093388,"Timestamp":1479335615515}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"172.22.0.167","Port":51490},"Maximum Memory":908381388,"Timestamp":1479335615496,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"0","Host":"172.22.0.167","Port":51491},"Maximum Memory":908381388,"Timestamp":1479335615515,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1479335616467,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"count at <console>:26","Number of Tasks":16,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:26","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Callsite":"parallelize at <console>:26","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.count(RDD.scala:1135)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)\n$line16.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line16.$read$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line16.$read$$iw$$iw$$iw.<init>(<console>:39)\n$line16.$read$$iw$$iw.<init>(<console>:41)\n$line16.$read$$iw.<init>(<console>:43)\n$line16.$read.<init>(<console>:45)\n$line16.$read$.<init>(<console>:49)\n$line16.$read$.<clinit>(<console>)\n$line16.$eval$.$print$lzycompute(<console>:7)\n$line16.$eval$.$print(<console>:6)\n$line16.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)","Accumulables":[]}],"Stage IDs":[0],"Properties":{}}
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"count at <console>:26","Number of Tasks":16,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:26","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Callsite":"parallelize at <console>:26","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":16,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.count(RDD.scala:1135)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:26)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:31)\n$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)\n$line16.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line16.$read$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line16.$read$$iw$$iw$$iw.<init>(<console>:39)\n$line16.$read$$iw$$iw.<init>(<console>:41)\n$line16.$read$$iw.<init>(<console>:43)\n$line16.$read.<init>(<console>:45)\n$line16.$read$.<init>(<console>:49)\n$line16.$read$.<clinit>(<console>)\n$line16.$eval$.$print$lzycompute(<console>:7)\n$line16.$eval$.$print(<console>:6)\n$line16.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\nscala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)","Accumulables":[]},"Properties":{}}
{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1479335616657,"Executor ID":"1","Host":"172.22.0.167","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index dcf83cb530..764156c3ed 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -153,7 +153,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
"executor node blacklisting" -> "applications/app-20161116163331-0000/executors",
- "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors"
+ "executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors",
+ "executor memory usage" -> "applications/app-20161116163331-0000/executors"
// Todo: enable this test when logging the even of onBlockUpdated. See: SPARK-13845
// "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
)
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index e5733aebf6..da198f946f 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -27,7 +27,7 @@ class StorageSuite extends SparkFunSuite {
// For testing add, update, and remove (for non-RDD blocks)
private def storageStatus1: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
assert(status.blocks.isEmpty)
assert(status.rddBlocks.isEmpty)
assert(status.memUsed === 0L)
@@ -74,7 +74,7 @@ class StorageSuite extends SparkFunSuite {
// For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
private def storageStatus2: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
assert(status.rddBlocks.isEmpty)
status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L))
@@ -252,9 +252,9 @@ class StorageSuite extends SparkFunSuite {
// For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
private def stockStorageStatuses: Seq[StorageStatus] = {
- val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
- val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
- val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
+ val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L, Some(1000L), Some(0L))
+ val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L, Some(2000L), Some(0L))
+ val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L, Some(3000L), Some(0L))
status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L))
status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L))
status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L))
@@ -332,4 +332,81 @@ class StorageSuite extends SparkFunSuite {
assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
}
+ private val offheap = StorageLevel.OFF_HEAP
+ // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD onheap
+ // and offheap blocks
+ private def storageStatus3: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, Some(1000L), Some(1000L))
+ assert(status.rddBlocks.isEmpty)
+ status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(TestBlockId("man"), BlockStatus(offheap, 10L, 0L))
+ status.addBlock(RDDBlockId(0, 0), BlockStatus(offheap, 10L, 0L))
+ status.addBlock(RDDBlockId(1, 1), BlockStatus(offheap, 100L, 0L))
+ status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L))
+ status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L))
+ status
+ }
+
+ test("storage memUsed, diskUsed with on-heap and off-heap blocks") {
+ val status = storageStatus3
+ def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
+ def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
+
+ def actualOnHeapMemUsed: Long =
+ status.blocks.values.filter(!_.storageLevel.useOffHeap).map(_.memSize).sum
+ def actualOffHeapMemUsed: Long =
+ status.blocks.values.filter(_.storageLevel.useOffHeap).map(_.memSize).sum
+
+ assert(status.maxMem === status.maxOnHeapMem.get + status.maxOffHeapMem.get)
+
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+ assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+ assert(status.memRemaining === status.maxMem - actualMemUsed)
+ assert(status.onHeapMemRemaining.get === status.maxOnHeapMem.get - actualOnHeapMemUsed)
+ assert(status.offHeapMemRemaining.get === status.maxOffHeapMem.get - actualOffHeapMemUsed)
+
+ status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L))
+ status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(offheap, 4L, 0L))
+ status.updateBlock(RDDBlockId(1, 1), BlockStatus(offheap, 4L, 0L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.onHeapMemUsed.get === actualOnHeapMemUsed)
+ assert(status.offHeapMemUsed.get === actualOffHeapMemUsed)
+
+ status.removeBlock(TestBlockId("fire"))
+ status.removeBlock(TestBlockId("man"))
+ status.removeBlock(RDDBlockId(2, 2))
+ status.removeBlock(RDDBlockId(2, 3))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ }
+
+ private def storageStatus4: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 2000L, None, None)
+ status
+ }
+ test("old SparkListenerBlockManagerAdded event compatible") {
+ // This scenario will only be happened when replaying old event log. In this scenario there's
+ // no block add or remove event replayed, so only total amount of memory is valid.
+ val status = storageStatus4
+ assert(status.maxMem === status.maxMemory)
+
+ assert(status.memUsed === 0L)
+ assert(status.diskUsed === 0L)
+ assert(status.onHeapMemUsed === None)
+ assert(status.offHeapMemUsed === None)
+
+ assert(status.memRemaining === status.maxMem)
+ assert(status.onHeapMemRemaining === None)
+ assert(status.offHeapMemRemaining === None)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 4228373036..f4c561c737 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.LocalSparkContext._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus}
+import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
@@ -103,6 +103,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "0")
.set("spark.ui.killEnabled", killEnabled.toString)
+ .set("spark.memory.offHeap.size", "64m")
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
@@ -151,6 +152,39 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val updatedRddJson = getJson(ui, "storage/rdd/0")
(updatedRddJson \ "storageLevel").extract[String] should be (
StorageLevels.MEMORY_ONLY.description)
+
+ val dataDistributions0 =
+ (updatedRddJson \ "dataDistribution").extract[Seq[RDDDataDistribution]]
+ dataDistributions0.length should be (1)
+ val dist0 = dataDistributions0.head
+
+ dist0.onHeapMemoryUsed should not be (None)
+ dist0.memoryUsed should be (dist0.onHeapMemoryUsed.get)
+ dist0.onHeapMemoryRemaining should not be (None)
+ dist0.offHeapMemoryRemaining should not be (None)
+ dist0.memoryRemaining should be (
+ dist0.onHeapMemoryRemaining.get + dist0.offHeapMemoryRemaining.get)
+ dist0.onHeapMemoryUsed should not be (Some(0L))
+ dist0.offHeapMemoryUsed should be (Some(0L))
+
+ rdd.unpersist()
+ rdd.persist(StorageLevels.OFF_HEAP).count()
+ val updatedStorageJson1 = getJson(ui, "storage/rdd")
+ updatedStorageJson1.children.length should be (1)
+ val updatedRddJson1 = getJson(ui, "storage/rdd/0")
+ val dataDistributions1 =
+ (updatedRddJson1 \ "dataDistribution").extract[Seq[RDDDataDistribution]]
+ dataDistributions1.length should be (1)
+ val dist1 = dataDistributions1.head
+
+ dist1.offHeapMemoryUsed should not be (None)
+ dist1.memoryUsed should be (dist1.offHeapMemoryUsed.get)
+ dist1.onHeapMemoryRemaining should not be (None)
+ dist1.offHeapMemoryRemaining should not be (None)
+ dist1.memoryRemaining should be (
+ dist1.onHeapMemoryRemaining.get + dist1.offHeapMemoryRemaining.get)
+ dist1.onHeapMemoryUsed should be (Some(0L))
+ dist1.offHeapMemoryUsed should not be (Some(0L))
}
}