diff options
author | jerryshao <sshao@hortonworks.com> | 2016-07-25 15:17:06 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-07-25 15:17:06 -0700 |
commit | f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd (patch) | |
tree | 2f5d5bb48568f2dfcac15516a746293775542a71 /core/src/main | |
parent | cda4603de340d533c49feac1b244ddfd291f9bcf (diff) | |
download | spark-f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd.tar.gz spark-f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd.tar.bz2 spark-f5ea7fe53974a7e8cbfc222b9a6f47669b53ccfd.zip |
[SPARK-16166][CORE] Also take off-heap memory usage into consideration in log and webui display
## What changes were proposed in this pull request?
Currently in the log and UI display, only on-heap storage memory is calculated and displayed,
```
16/06/27 13:41:52 INFO MemoryStore: Block rdd_5_0 stored as values in memory (estimated size 17.8 KB, free 665.9 MB)
```
<img width="1232" alt="untitled" src="https://cloud.githubusercontent.com/assets/850797/16369960/53fb614e-3c6e-11e6-8fa3-7ffe65abcb49.png">
With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992) off-heap memory is supported for data persistence, so here change to also take off-heap storage memory into consideration.
## How was this patch tested?
Unit test and local verification.
Author: jerryshao <sshao@hortonworks.com>
Closes #13920 from jerryshao/SPARK-16166.
Diffstat (limited to 'core/src/main')
5 files changed, 20 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 0210217e41..82442cf561 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -62,13 +62,19 @@ private[spark] abstract class MemoryManager( offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory) /** - * Total available memory for storage, in bytes. This amount can vary over time, depending on - * the MemoryManager implementation. + * Total available on heap memory for storage, in bytes. This amount can vary over time, + * depending on the MemoryManager implementation. * In this model, this is equivalent to the amount of memory not occupied by execution. */ def maxOnHeapStorageMemory: Long /** + * Total available off heap memory for storage, in bytes. This amount can vary over time, + * depending on the MemoryManager implementation. + */ + def maxOffHeapStorageMemory: Long + + /** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints. */ diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 08155aa298..a6f7db0600 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -55,6 +55,8 @@ private[spark] class StaticMemoryManager( (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } + override def maxOffHeapStorageMemory: Long = 0L + override def acquireStorageMemory( blockId: BlockId, numBytes: Long, diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index c7b36be602..fea2808218 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -67,6 +67,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed } + override def maxOffHeapStorageMemory: Long = synchronized { + maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed + } + /** * Try to acquire up to `numBytes` of execution memory for the current task and return the * number of bytes obtained, or 0 if none can be allocated. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 83a9cbd63d..015e71d126 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -96,7 +96,8 @@ private[spark] class BlockManager( // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxMemory` can ever possibly reach. We may need // to revisit whether reporting this value as the "max" is intuitive to the user. - private val maxMemory = memoryManager.maxOnHeapStorageMemory + private val maxMemory = + memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. @@ -802,7 +803,7 @@ private[spark] class BlockManager( val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { - // Now that the block is in either the memory, externalBlockStore, or disk store, + // Now that the block is in either the memory or disk store, // tell the master about it. info.size = size if (tellMaster) { diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 0349da0d8a..586339a58d 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -101,7 +101,9 @@ private[spark] class MemoryStore( conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) /** Total amount of memory available for storage, in bytes. */ - private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory + private def maxMemory: Long = { + memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory + } if (maxMemory < unrollMemoryThreshold) { logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + |