aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-26 11:03:25 -0700
committerReynold Xin <rxin@databricks.com>2016-03-26 11:03:25 -0700
commit20c0bcd972cfbb2f2aa92948f9ee337724a70361 (patch)
tree96ba2aa2989a1243062b0ae91a62b86e15acd365 /core/src/main/scala/org/apache
parentbd94ea4c80f4fc18f4000346d7c6717539846efb (diff)
downloadspark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.tar.gz
spark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.tar.bz2
spark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.zip
[SPARK-14135] Add off-heap storage memory bookkeeping support to MemoryManager
This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap. In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation). Author: Josh Rosen <joshrosen@databricks.com> Closes #11942 from JoshRosen/off-heap-storage-memory-bookkeeping.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala168
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala19
8 files changed, 192 insertions, 114 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 319718edb5..f8167074c6 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -37,13 +37,18 @@ import org.apache.spark.internal.Logging
* tasks was performed by the ShuffleMemoryManager.
*
* @param lock a [[MemoryManager]] instance to synchronize on
- * @param poolName a human-readable name for this pool, for use in log messages
+ * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
*/
private[memory] class ExecutionMemoryPool(
lock: Object,
- poolName: String
+ memoryMode: MemoryMode
) extends MemoryPool(lock) with Logging {
+ private[this] val poolName: String = memoryMode match {
+ case MemoryMode.ON_HEAP => "on-heap execution"
+ case MemoryMode.OFF_HEAP => "off-heap execution"
+ }
+
/**
* Map from taskAttemptId -> memory consumption in bytes
*/
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 5e8abeecea..10656bc8c8 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -36,42 +36,52 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
- storageMemory: Long,
+ onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {
// -- Methods related to memory allocation policies and bookkeeping ------------------------------
@GuardedBy("this")
- protected val storageMemoryPool = new StorageMemoryPool(this)
+ protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
- protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
+ protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
- protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
+ protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
+ @GuardedBy("this")
+ protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
- storageMemoryPool.incrementPoolSize(storageMemory)
+ onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
- offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))
+
+ protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
+ protected[this] val offHeapStorageMemory =
+ (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
+
+ offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
+ offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
* the MemoryManager implementation.
* In this model, this is equivalent to the amount of memory not occupied by execution.
*/
- def maxStorageMemory: Long
+ def maxOnHeapStorageMemory: Long
/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
final def setMemoryStore(store: MemoryStore): Unit = synchronized {
- storageMemoryPool.setMemoryStore(store)
+ onHeapStorageMemoryPool.setMemoryStore(store)
+ offHeapStorageMemoryPool.setMemoryStore(store)
}
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ *
* @return whether all N bytes were successfully granted.
*/
- def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean
+ def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -82,7 +92,7 @@ private[spark] abstract class MemoryManager(
*
* @return whether all N bytes were successfully granted.
*/
- def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean
+ def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
@@ -126,22 +136,26 @@ private[spark] abstract class MemoryManager(
/**
* Release N bytes of storage memory.
*/
- def releaseStorageMemory(numBytes: Long): Unit = synchronized {
- storageMemoryPool.releaseMemory(numBytes)
+ def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
+ case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
+ }
}
/**
* Release all storage memory acquired.
*/
final def releaseAllStorageMemory(): Unit = synchronized {
- storageMemoryPool.releaseAllMemory()
+ onHeapStorageMemoryPool.releaseAllMemory()
+ offHeapStorageMemoryPool.releaseAllMemory()
}
/**
* Release N bytes of unroll memory.
*/
- final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
- releaseStorageMemory(numBytes)
+ final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
+ releaseStorageMemory(numBytes, memoryMode)
}
/**
@@ -155,7 +169,7 @@ private[spark] abstract class MemoryManager(
* Storage memory currently in use, in bytes.
*/
final def storageMemoryUsed: Long = synchronized {
- storageMemoryPool.memoryUsed
+ onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
}
/**
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index f9f8f820bc..cbd0fa9ec2 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -30,12 +30,12 @@ import org.apache.spark.storage.BlockId
private[spark] class StaticMemoryManager(
conf: SparkConf,
maxOnHeapExecutionMemory: Long,
- override val maxStorageMemory: Long,
+ override val maxOnHeapStorageMemory: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
- maxStorageMemory,
+ maxOnHeapStorageMemory,
maxOnHeapExecutionMemory) {
def this(conf: SparkConf, numCores: Int) {
@@ -46,25 +46,39 @@ private[spark] class StaticMemoryManager(
numCores)
}
+ // The StaticMemoryManager does not support off-heap storage memory:
+ offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
+ offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
+
// Max number of bytes worth of blocks to evict when unrolling
private val maxUnrollMemory: Long = {
- (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
+ (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
- override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- if (numBytes > maxStorageMemory) {
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ require(memoryMode != MemoryMode.OFF_HEAP,
+ "StaticMemoryManager does not support off-heap storage memory")
+ if (numBytes > maxOnHeapStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
- s"memory limit ($maxStorageMemory bytes)")
+ s"memory limit ($maxOnHeapStorageMemory bytes)")
false
} else {
- storageMemoryPool.acquireMemory(blockId, numBytes)
+ onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
}
}
- override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
- val freeMemory = storageMemoryPool.memoryFree
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ require(memoryMode != MemoryMode.OFF_HEAP,
+ "StaticMemoryManager does not support off-heap unroll memory")
+ val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
+ val freeMemory = onHeapStorageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
@@ -72,7 +86,7 @@ private[spark] class StaticMemoryManager(
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
- storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+ onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}
private[memory]
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6fcf26e3ec..a67e8da26b 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -28,8 +28,12 @@ import org.apache.spark.storage.memory.MemoryStore
* (caching).
*
* @param lock a [[MemoryManager]] instance to synchronize on
+ * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
*/
-private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
+private[memory] class StorageMemoryPool(
+ lock: Object,
+ memoryMode: MemoryMode
+ ) extends MemoryPool(lock) with Logging {
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
@@ -79,7 +83,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
- if (numBytesToFree > 0) {
+ // Once we support off-heap caching, this will need to change:
+ if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
@@ -117,7 +122,14 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
- val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
+ val spaceFreedByEviction = {
+ // Once we support off-heap caching, this will need to change:
+ if (memoryMode == MemoryMode.ON_HEAP) {
+ memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
+ } else {
+ 0
+ }
+ }
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 6c57c98ea5..fa9c021f70 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -39,27 +39,32 @@ import org.apache.spark.storage.BlockId
* up most of the storage space, in which case the new blocks will be evicted immediately
* according to their respective storage levels.
*
- * @param storageRegionSize Size of the storage region, in bytes.
+ * @param onHeapStorageRegionSize Size of the storage region, in bytes.
* This region is not statically reserved; execution can borrow from
* it if necessary. Cached blocks can be evicted only if actual
* storage memory usage exceeds this region.
*/
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
- val maxMemory: Long,
- storageRegionSize: Long,
+ val maxHeapMemory: Long,
+ onHeapStorageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
- storageRegionSize,
- maxMemory - storageRegionSize) {
+ onHeapStorageRegionSize,
+ maxHeapMemory - onHeapStorageRegionSize) {
- // We always maintain this invariant:
- assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
+ private def assertInvariants(): Unit = {
+ assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
+ assert(
+ offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
+ }
+
+ assertInvariants()
- override def maxStorageMemory: Long = synchronized {
- maxMemory - onHeapExecutionMemoryPool.memoryUsed
+ override def maxOnHeapStorageMemory: Long = synchronized {
+ maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}
/**
@@ -75,83 +80,104 @@ private[spark] class UnifiedMemoryManager private[memory] (
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
- assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
+ assertInvariants()
assert(numBytes >= 0)
- memoryMode match {
- case MemoryMode.ON_HEAP =>
-
- /**
- * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
- *
- * When acquiring memory for a task, the execution pool may need to make multiple
- * attempts. Each attempt must be able to evict storage in case another task jumps in
- * and caches a large block between the attempts. This is called once per attempt.
- */
- def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
- if (extraMemoryNeeded > 0) {
- // There is not enough free memory in the execution pool, so try to reclaim memory from
- // storage. We can reclaim any free memory from the storage pool. If the storage pool
- // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
- // the memory that storage has borrowed from execution.
- val memoryReclaimableFromStorage =
- math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
- if (memoryReclaimableFromStorage > 0) {
- // Only reclaim as much space as is necessary and available:
- val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
- math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
- onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
- }
- }
- }
+ val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
+ case MemoryMode.ON_HEAP => (
+ onHeapExecutionMemoryPool,
+ onHeapStorageMemoryPool,
+ onHeapStorageRegionSize,
+ maxHeapMemory)
+ case MemoryMode.OFF_HEAP => (
+ offHeapExecutionMemoryPool,
+ offHeapStorageMemoryPool,
+ offHeapStorageMemory,
+ maxOffHeapMemory)
+ }
- /**
- * The size the execution pool would have after evicting storage memory.
- *
- * The execution memory pool divides this quantity among the active tasks evenly to cap
- * the execution memory allocation for each task. It is important to keep this greater
- * than the execution pool size, which doesn't take into account potential memory that
- * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
- *
- * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
- * in execution memory allocation across tasks, Otherwise, a task may occupy more than
- * its fair share of execution memory, mistakenly thinking that other tasks can acquire
- * the portion of storage memory that cannot be evicted.
- */
- def computeMaxExecutionPoolSize(): Long = {
- maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+ /**
+ * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
+ *
+ * When acquiring memory for a task, the execution pool may need to make multiple
+ * attempts. Each attempt must be able to evict storage in case another task jumps in
+ * and caches a large block between the attempts. This is called once per attempt.
+ */
+ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+ if (extraMemoryNeeded > 0) {
+ // There is not enough free memory in the execution pool, so try to reclaim memory from
+ // storage. We can reclaim any free memory from the storage pool. If the storage pool
+ // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
+ // the memory that storage has borrowed from execution.
+ val memoryReclaimableFromStorage = math.max(
+ storagePool.memoryFree,
+ storagePool.poolSize - storageRegionSize)
+ if (memoryReclaimableFromStorage > 0) {
+ // Only reclaim as much space as is necessary and available:
+ val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
+ math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+ executionPool.incrementPoolSize(spaceReclaimed)
}
+ }
+ }
- onHeapExecutionMemoryPool.acquireMemory(
- numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
-
- case MemoryMode.OFF_HEAP =>
- // For now, we only support on-heap caching of data, so we do not need to interact with
- // the storage pool when allocating off-heap memory. This will change in the future, though.
- offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ /**
+ * The size the execution pool would have after evicting storage memory.
+ *
+ * The execution memory pool divides this quantity among the active tasks evenly to cap
+ * the execution memory allocation for each task. It is important to keep this greater
+ * than the execution pool size, which doesn't take into account potential memory that
+ * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
+ *
+ * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
+ * in execution memory allocation across tasks, Otherwise, a task may occupy more than
+ * its fair share of execution memory, mistakenly thinking that other tasks can acquire
+ * the portion of storage memory that cannot be evicted.
+ */
+ def computeMaxExecutionPoolSize(): Long = {
+ maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}
+
+ executionPool.acquireMemory(
+ numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}
- override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ assertInvariants()
assert(numBytes >= 0)
- if (numBytes > maxStorageMemory) {
+ val (executionPool, storagePool, maxMemory) = memoryMode match {
+ case MemoryMode.ON_HEAP => (
+ onHeapExecutionMemoryPool,
+ onHeapStorageMemoryPool,
+ maxOnHeapStorageMemory)
+ case MemoryMode.OFF_HEAP => (
+ offHeapExecutionMemoryPool,
+ offHeapStorageMemoryPool,
+ maxOffHeapMemory)
+ }
+ if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
- s"memory limit ($maxStorageMemory bytes)")
+ s"memory limit ($maxMemory bytes)")
return false
}
- if (numBytes > storageMemoryPool.memoryFree) {
+ if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
- val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
- onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
- storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
+ val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
+ executionPool.decrementPoolSize(memoryBorrowedFromExecution)
+ storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
- storageMemoryPool.acquireMemory(blockId, numBytes)
+ storagePool.acquireMemory(blockId, numBytes)
}
- override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- acquireStorageMemory(blockId, numBytes)
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ acquireStorageMemory(blockId, numBytes, memoryMode)
}
}
@@ -167,8 +193,8 @@ object UnifiedMemoryManager {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
- maxMemory = maxMemory,
- storageRegionSize =
+ maxHeapMemory = maxMemory,
+ onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 30d2e23efd..0c7763f236 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.Logging
-import org.apache.spark.memory.MemoryManager
+import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
@@ -94,7 +94,7 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
- private val maxMemory = memoryManager.maxStorageMemory
+ private val maxMemory = memoryManager.maxOnHeapStorageMemory
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 38e9534251..7d23295e25 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -21,6 +21,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.memory.MemoryMode
import org.apache.spark.util.Utils
/**
@@ -65,6 +66,11 @@ class StorageLevel private(
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
+ private[spark] def memoryMode: MemoryMode = {
+ if (useOffHeap) MemoryMode.OFF_HEAP
+ else MemoryMode.ON_HEAP
+ }
+
override def clone(): StorageLevel = {
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 1a78c9c010..3ca41f32c1 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.memory.MemoryManager
+import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
@@ -93,7 +93,7 @@ private[spark] class MemoryStore(
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
/** Total amount of memory available for storage, in bytes. */
- private def maxMemory: Long = memoryManager.maxStorageMemory
+ private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory
if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
@@ -133,7 +133,7 @@ private[spark] class MemoryStore(
size: Long,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- if (memoryManager.acquireStorageMemory(blockId, size)) {
+ if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
@@ -229,7 +229,7 @@ private[spark] class MemoryStore(
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(amount)
- val success = memoryManager.acquireStorageMemory(blockId, amount)
+ val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
}
@@ -237,7 +237,8 @@ private[spark] class MemoryStore(
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock <= size) {
val acquiredExtra =
- memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock)
+ memoryManager.acquireStorageMemory(
+ blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
if (acquiredExtra) {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
@@ -353,7 +354,7 @@ private[spark] class MemoryStore(
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
- val success = memoryManager.acquireStorageMemory(blockId, entry.size)
+ val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
entries.synchronized {
@@ -406,7 +407,7 @@ private[spark] class MemoryStore(
entries.remove(blockId)
}
if (entry != null) {
- memoryManager.releaseStorageMemory(entry.size)
+ memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP)
logInfo(s"Block $blockId of size ${entry.size} dropped " +
s"from memory (free ${maxMemory - blocksMemoryUsed})")
true
@@ -531,7 +532,7 @@ private[spark] class MemoryStore(
*/
def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
memoryManager.synchronized {
- val success = memoryManager.acquireUnrollMemory(blockId, memory)
+ val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP)
if (success) {
val taskAttemptId = currentTaskAttemptId()
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
@@ -554,7 +555,7 @@ private[spark] class MemoryStore(
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
- memoryManager.releaseUnrollMemory(memoryToRelease)
+ memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP)
}
}
}