aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-26 11:03:25 -0700
committerReynold Xin <rxin@databricks.com>2016-03-26 11:03:25 -0700
commit20c0bcd972cfbb2f2aa92948f9ee337724a70361 (patch)
tree96ba2aa2989a1243062b0ae91a62b86e15acd365 /core
parentbd94ea4c80f4fc18f4000346d7c6717539846efb (diff)
downloadspark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.tar.gz
spark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.tar.bz2
spark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.zip
[SPARK-14135] Add off-heap storage memory bookkeeping support to MemoryManager
This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap. In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation). Author: Josh Rosen <joshrosen@databricks.com> Closes #11942 from JoshRosen/off-heap-storage-memory-bookkeeping.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala168
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala60
-rw-r--r--core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala80
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala4
13 files changed, 280 insertions, 186 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 319718edb5..f8167074c6 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -37,13 +37,18 @@ import org.apache.spark.internal.Logging
* tasks was performed by the ShuffleMemoryManager.
*
* @param lock a [[MemoryManager]] instance to synchronize on
- * @param poolName a human-readable name for this pool, for use in log messages
+ * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
*/
private[memory] class ExecutionMemoryPool(
lock: Object,
- poolName: String
+ memoryMode: MemoryMode
) extends MemoryPool(lock) with Logging {
+ private[this] val poolName: String = memoryMode match {
+ case MemoryMode.ON_HEAP => "on-heap execution"
+ case MemoryMode.OFF_HEAP => "off-heap execution"
+ }
+
/**
* Map from taskAttemptId -> memory consumption in bytes
*/
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 5e8abeecea..10656bc8c8 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -36,42 +36,52 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
- storageMemory: Long,
+ onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {
// -- Methods related to memory allocation policies and bookkeeping ------------------------------
@GuardedBy("this")
- protected val storageMemoryPool = new StorageMemoryPool(this)
+ protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
- protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
+ protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
- protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
+ protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
+ @GuardedBy("this")
+ protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
- storageMemoryPool.incrementPoolSize(storageMemory)
+ onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
- offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))
+
+ protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
+ protected[this] val offHeapStorageMemory =
+ (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
+
+ offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
+ offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
* the MemoryManager implementation.
* In this model, this is equivalent to the amount of memory not occupied by execution.
*/
- def maxStorageMemory: Long
+ def maxOnHeapStorageMemory: Long
/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
final def setMemoryStore(store: MemoryStore): Unit = synchronized {
- storageMemoryPool.setMemoryStore(store)
+ onHeapStorageMemoryPool.setMemoryStore(store)
+ offHeapStorageMemoryPool.setMemoryStore(store)
}
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ *
* @return whether all N bytes were successfully granted.
*/
- def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean
+ def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -82,7 +92,7 @@ private[spark] abstract class MemoryManager(
*
* @return whether all N bytes were successfully granted.
*/
- def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean
+ def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
@@ -126,22 +136,26 @@ private[spark] abstract class MemoryManager(
/**
* Release N bytes of storage memory.
*/
- def releaseStorageMemory(numBytes: Long): Unit = synchronized {
- storageMemoryPool.releaseMemory(numBytes)
+ def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
+ case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
+ }
}
/**
* Release all storage memory acquired.
*/
final def releaseAllStorageMemory(): Unit = synchronized {
- storageMemoryPool.releaseAllMemory()
+ onHeapStorageMemoryPool.releaseAllMemory()
+ offHeapStorageMemoryPool.releaseAllMemory()
}
/**
* Release N bytes of unroll memory.
*/
- final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
- releaseStorageMemory(numBytes)
+ final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
+ releaseStorageMemory(numBytes, memoryMode)
}
/**
@@ -155,7 +169,7 @@ private[spark] abstract class MemoryManager(
* Storage memory currently in use, in bytes.
*/
final def storageMemoryUsed: Long = synchronized {
- storageMemoryPool.memoryUsed
+ onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
}
/**
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index f9f8f820bc..cbd0fa9ec2 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -30,12 +30,12 @@ import org.apache.spark.storage.BlockId
private[spark] class StaticMemoryManager(
conf: SparkConf,
maxOnHeapExecutionMemory: Long,
- override val maxStorageMemory: Long,
+ override val maxOnHeapStorageMemory: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
- maxStorageMemory,
+ maxOnHeapStorageMemory,
maxOnHeapExecutionMemory) {
def this(conf: SparkConf, numCores: Int) {
@@ -46,25 +46,39 @@ private[spark] class StaticMemoryManager(
numCores)
}
+ // The StaticMemoryManager does not support off-heap storage memory:
+ offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
+ offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
+
// Max number of bytes worth of blocks to evict when unrolling
private val maxUnrollMemory: Long = {
- (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
+ (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
- override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- if (numBytes > maxStorageMemory) {
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ require(memoryMode != MemoryMode.OFF_HEAP,
+ "StaticMemoryManager does not support off-heap storage memory")
+ if (numBytes > maxOnHeapStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
- s"memory limit ($maxStorageMemory bytes)")
+ s"memory limit ($maxOnHeapStorageMemory bytes)")
false
} else {
- storageMemoryPool.acquireMemory(blockId, numBytes)
+ onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
}
}
- override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
- val freeMemory = storageMemoryPool.memoryFree
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ require(memoryMode != MemoryMode.OFF_HEAP,
+ "StaticMemoryManager does not support off-heap unroll memory")
+ val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
+ val freeMemory = onHeapStorageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
@@ -72,7 +86,7 @@ private[spark] class StaticMemoryManager(
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
- storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
+ onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}
private[memory]
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6fcf26e3ec..a67e8da26b 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -28,8 +28,12 @@ import org.apache.spark.storage.memory.MemoryStore
* (caching).
*
* @param lock a [[MemoryManager]] instance to synchronize on
+ * @param memoryMode the type of memory tracked by this pool (on- or off-heap)
*/
-private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
+private[memory] class StorageMemoryPool(
+ lock: Object,
+ memoryMode: MemoryMode
+ ) extends MemoryPool(lock) with Logging {
@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
@@ -79,7 +83,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
- if (numBytesToFree > 0) {
+ // Once we support off-heap caching, this will need to change:
+ if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
@@ -117,7 +122,14 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
- val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
+ val spaceFreedByEviction = {
+ // Once we support off-heap caching, this will need to change:
+ if (memoryMode == MemoryMode.ON_HEAP) {
+ memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
+ } else {
+ 0
+ }
+ }
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 6c57c98ea5..fa9c021f70 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -39,27 +39,32 @@ import org.apache.spark.storage.BlockId
* up most of the storage space, in which case the new blocks will be evicted immediately
* according to their respective storage levels.
*
- * @param storageRegionSize Size of the storage region, in bytes.
+ * @param onHeapStorageRegionSize Size of the storage region, in bytes.
* This region is not statically reserved; execution can borrow from
* it if necessary. Cached blocks can be evicted only if actual
* storage memory usage exceeds this region.
*/
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
- val maxMemory: Long,
- storageRegionSize: Long,
+ val maxHeapMemory: Long,
+ onHeapStorageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
- storageRegionSize,
- maxMemory - storageRegionSize) {
+ onHeapStorageRegionSize,
+ maxHeapMemory - onHeapStorageRegionSize) {
- // We always maintain this invariant:
- assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
+ private def assertInvariants(): Unit = {
+ assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
+ assert(
+ offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
+ }
+
+ assertInvariants()
- override def maxStorageMemory: Long = synchronized {
- maxMemory - onHeapExecutionMemoryPool.memoryUsed
+ override def maxOnHeapStorageMemory: Long = synchronized {
+ maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}
/**
@@ -75,83 +80,104 @@ private[spark] class UnifiedMemoryManager private[memory] (
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
- assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
+ assertInvariants()
assert(numBytes >= 0)
- memoryMode match {
- case MemoryMode.ON_HEAP =>
-
- /**
- * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
- *
- * When acquiring memory for a task, the execution pool may need to make multiple
- * attempts. Each attempt must be able to evict storage in case another task jumps in
- * and caches a large block between the attempts. This is called once per attempt.
- */
- def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
- if (extraMemoryNeeded > 0) {
- // There is not enough free memory in the execution pool, so try to reclaim memory from
- // storage. We can reclaim any free memory from the storage pool. If the storage pool
- // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
- // the memory that storage has borrowed from execution.
- val memoryReclaimableFromStorage =
- math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
- if (memoryReclaimableFromStorage > 0) {
- // Only reclaim as much space as is necessary and available:
- val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
- math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
- onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
- }
- }
- }
+ val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
+ case MemoryMode.ON_HEAP => (
+ onHeapExecutionMemoryPool,
+ onHeapStorageMemoryPool,
+ onHeapStorageRegionSize,
+ maxHeapMemory)
+ case MemoryMode.OFF_HEAP => (
+ offHeapExecutionMemoryPool,
+ offHeapStorageMemoryPool,
+ offHeapStorageMemory,
+ maxOffHeapMemory)
+ }
- /**
- * The size the execution pool would have after evicting storage memory.
- *
- * The execution memory pool divides this quantity among the active tasks evenly to cap
- * the execution memory allocation for each task. It is important to keep this greater
- * than the execution pool size, which doesn't take into account potential memory that
- * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
- *
- * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
- * in execution memory allocation across tasks, Otherwise, a task may occupy more than
- * its fair share of execution memory, mistakenly thinking that other tasks can acquire
- * the portion of storage memory that cannot be evicted.
- */
- def computeMaxExecutionPoolSize(): Long = {
- maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+ /**
+ * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
+ *
+ * When acquiring memory for a task, the execution pool may need to make multiple
+ * attempts. Each attempt must be able to evict storage in case another task jumps in
+ * and caches a large block between the attempts. This is called once per attempt.
+ */
+ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+ if (extraMemoryNeeded > 0) {
+ // There is not enough free memory in the execution pool, so try to reclaim memory from
+ // storage. We can reclaim any free memory from the storage pool. If the storage pool
+ // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
+ // the memory that storage has borrowed from execution.
+ val memoryReclaimableFromStorage = math.max(
+ storagePool.memoryFree,
+ storagePool.poolSize - storageRegionSize)
+ if (memoryReclaimableFromStorage > 0) {
+ // Only reclaim as much space as is necessary and available:
+ val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
+ math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+ executionPool.incrementPoolSize(spaceReclaimed)
}
+ }
+ }
- onHeapExecutionMemoryPool.acquireMemory(
- numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
-
- case MemoryMode.OFF_HEAP =>
- // For now, we only support on-heap caching of data, so we do not need to interact with
- // the storage pool when allocating off-heap memory. This will change in the future, though.
- offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ /**
+ * The size the execution pool would have after evicting storage memory.
+ *
+ * The execution memory pool divides this quantity among the active tasks evenly to cap
+ * the execution memory allocation for each task. It is important to keep this greater
+ * than the execution pool size, which doesn't take into account potential memory that
+ * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
+ *
+ * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
+ * in execution memory allocation across tasks, Otherwise, a task may occupy more than
+ * its fair share of execution memory, mistakenly thinking that other tasks can acquire
+ * the portion of storage memory that cannot be evicted.
+ */
+ def computeMaxExecutionPoolSize(): Long = {
+ maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}
+
+ executionPool.acquireMemory(
+ numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}
- override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ assertInvariants()
assert(numBytes >= 0)
- if (numBytes > maxStorageMemory) {
+ val (executionPool, storagePool, maxMemory) = memoryMode match {
+ case MemoryMode.ON_HEAP => (
+ onHeapExecutionMemoryPool,
+ onHeapStorageMemoryPool,
+ maxOnHeapStorageMemory)
+ case MemoryMode.OFF_HEAP => (
+ offHeapExecutionMemoryPool,
+ offHeapStorageMemoryPool,
+ maxOffHeapMemory)
+ }
+ if (numBytes > maxMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
- s"memory limit ($maxStorageMemory bytes)")
+ s"memory limit ($maxMemory bytes)")
return false
}
- if (numBytes > storageMemoryPool.memoryFree) {
+ if (numBytes > storagePool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
- val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
- onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
- storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
+ val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
+ executionPool.decrementPoolSize(memoryBorrowedFromExecution)
+ storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
- storageMemoryPool.acquireMemory(blockId, numBytes)
+ storagePool.acquireMemory(blockId, numBytes)
}
- override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
- acquireStorageMemory(blockId, numBytes)
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = synchronized {
+ acquireStorageMemory(blockId, numBytes, memoryMode)
}
}
@@ -167,8 +193,8 @@ object UnifiedMemoryManager {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
- maxMemory = maxMemory,
- storageRegionSize =
+ maxHeapMemory = maxMemory,
+ onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 30d2e23efd..0c7763f236 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.Logging
-import org.apache.spark.memory.MemoryManager
+import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
@@ -94,7 +94,7 @@ private[spark] class BlockManager(
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
- private val maxMemory = memoryManager.maxStorageMemory
+ private val maxMemory = memoryManager.maxOnHeapStorageMemory
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 38e9534251..7d23295e25 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -21,6 +21,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.memory.MemoryMode
import org.apache.spark.util.Utils
/**
@@ -65,6 +66,11 @@ class StorageLevel private(
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
+ private[spark] def memoryMode: MemoryMode = {
+ if (useOffHeap) MemoryMode.OFF_HEAP
+ else MemoryMode.ON_HEAP
+ }
+
override def clone(): StorageLevel = {
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 1a78c9c010..3ca41f32c1 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.memory.MemoryManager
+import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
@@ -93,7 +93,7 @@ private[spark] class MemoryStore(
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
/** Total amount of memory available for storage, in bytes. */
- private def maxMemory: Long = memoryManager.maxStorageMemory
+ private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory
if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
@@ -133,7 +133,7 @@ private[spark] class MemoryStore(
size: Long,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- if (memoryManager.acquireStorageMemory(blockId, size)) {
+ if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
@@ -229,7 +229,7 @@ private[spark] class MemoryStore(
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(amount)
- val success = memoryManager.acquireStorageMemory(blockId, amount)
+ val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
}
@@ -237,7 +237,8 @@ private[spark] class MemoryStore(
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock <= size) {
val acquiredExtra =
- memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock)
+ memoryManager.acquireStorageMemory(
+ blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
if (acquiredExtra) {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
@@ -353,7 +354,7 @@ private[spark] class MemoryStore(
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
- val success = memoryManager.acquireStorageMemory(blockId, entry.size)
+ val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
entries.synchronized {
@@ -406,7 +407,7 @@ private[spark] class MemoryStore(
entries.remove(blockId)
}
if (entry != null) {
- memoryManager.releaseStorageMemory(entry.size)
+ memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP)
logInfo(s"Block $blockId of size ${entry.size} dropped " +
s"from memory (free ${maxMemory - blocksMemoryUsed})")
true
@@ -531,7 +532,7 @@ private[spark] class MemoryStore(
*/
def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
memoryManager.synchronized {
- val success = memoryManager.acquireUnrollMemory(blockId, memory)
+ val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP)
if (success) {
val taskAttemptId = currentTaskAttemptId()
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
@@ -554,7 +555,7 @@ private[spark] class MemoryStore(
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
- memoryManager.releaseUnrollMemory(memoryToRelease)
+ memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 686e948b5d..aaca653c58 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -100,7 +100,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
evictBlocksToFreeSpaceCalled.set(numBytesToFree)
if (numBytesToFree <= mm.storageMemoryUsed) {
// We can evict enough blocks to fulfill the request for space
- mm.releaseStorageMemory(numBytesToFree)
+ mm.releaseStorageMemory(numBytesToFree, MemoryMode.ON_HEAP)
evictedBlocks.append(
(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L)))
numBytesToFree
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 741d4fdf78..4e31fb5589 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -35,7 +35,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val mm = new StaticMemoryManager(
conf,
maxOnHeapExecutionMemory = maxExecutionMem,
- maxStorageMemory = maxStorageMem,
+ maxOnHeapStorageMemory = maxStorageMem,
numCores = 1)
val ms = makeMemoryStore(mm)
(mm, ms)
@@ -50,7 +50,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
.set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
- maxStorageMemory = 0,
+ maxOnHeapStorageMemory = 0,
numCores = 1)
}
@@ -58,22 +58,23 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val maxExecutionMem = 1000L
val taskAttemptId = 0L
val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
+ val memoryMode = MemoryMode.ON_HEAP
assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L)
+ assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
// Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 890L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
+ mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
- mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP)
+ mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, memoryMode)
assert(mm.executionMemoryUsed === 0L)
}
@@ -81,23 +82,24 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("you can see the world you brought to live")
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
+ val memoryMode = MemoryMode.ON_HEAP
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 10L))
+ assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
- assert(mm.acquireStorageMemory(dummyBlock, 100L))
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L))
+ assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU eviction
- assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem))
+ assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 1L)
assert(evictedBlocks.nonEmpty)
evictedBlocks.clear()
@@ -105,19 +107,19 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
// 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted
// the 1000-byte block entirely. This is set up differently so we can write finer-grained tests.
assert(mm.storageMemoryUsed === 1000L)
- mm.releaseStorageMemory(800L)
+ mm.releaseStorageMemory(800L, memoryMode)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireStorageMemory(dummyBlock, 1L))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
- mm.releaseStorageMemory(100L)
+ mm.releaseStorageMemory(100L, memoryMode)
assert(mm.storageMemoryUsed === 0L)
}
@@ -127,20 +129,21 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val taskAttemptId = 0L
val dummyBlock = TestBlockId("ain't nobody love like you do")
val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
+ val memoryMode = MemoryMode.ON_HEAP
// Only execution memory should increase
- assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 100L)
- assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
- assert(mm.acquireStorageMemory(dummyBlock, 50L))
+ assert(mm.acquireStorageMemory(dummyBlock, 50L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
- mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP)
+ mm.releaseExecutionMemory(133L, taskAttemptId, memoryMode)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 67L)
// Only storage memory should be released
@@ -153,21 +156,22 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("lonely water")
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
- assert(mm.acquireUnrollMemory(dummyBlock, 100L))
+ val memoryMode = MemoryMode.ON_HEAP
+ assert(mm.acquireUnrollMemory(dummyBlock, 100L, memoryMode))
when(ms.currentUnrollMemory).thenReturn(100L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 100L)
- mm.releaseUnrollMemory(40L)
+ mm.releaseUnrollMemory(40L, memoryMode)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireStorageMemory(dummyBlock, 800L))
+ assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 860L)
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
// As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes.
// Requesting 240 more bytes of unroll memory will leave our total unroll memory at
// 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted.
- assert(mm.acquireUnrollMemory(dummyBlock, 240L))
+ assert(mm.acquireUnrollMemory(dummyBlock, 240L, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000
when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
assert(mm.storageMemoryUsed === 1000L)
@@ -175,11 +179,11 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
// We already have 300 bytes of unroll memory, so requesting 150 more will leave us
// above the 400-byte limit. Since there is not enough free memory, this request will
// fail even after evicting as much as we can (400 - 300 = 100 bytes).
- assert(!mm.acquireUnrollMemory(dummyBlock, 150L))
+ assert(!mm.acquireUnrollMemory(dummyBlock, 150L, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 900L)
// Release beyond what was acquired
- mm.releaseUnrollMemory(maxStorageMem)
+ mm.releaseUnrollMemory(maxStorageMem, memoryMode)
assert(mm.storageMemoryUsed === 0L)
}
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 6dad3f4ae7..6a4f409e8e 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -39,16 +39,22 @@ class TestMemoryManager(conf: SparkConf)
grant
}
}
- override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = true
- override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = true
- override def releaseStorageMemory(numBytes: Long): Unit = {}
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = true
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ memoryMode: MemoryMode): Boolean = true
+ override def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = {}
override private[memory] def releaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = {
available += numBytes
}
- override def maxStorageMemory: Long = Long.MaxValue
+ override def maxOnHeapStorageMemory: Long = Long.MaxValue
private var oomOnce = false
private var available = Long.MaxValue
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 9001a26652..14255818c7 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -52,47 +52,49 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, _) = makeThings(maxMemory)
+ val memoryMode = MemoryMode.ON_HEAP
assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L)
+ assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
// Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 890L)
assert(mm.executionMemoryUsed === maxMemory)
- assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L)
assert(mm.executionMemoryUsed === maxMemory)
- mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
+ mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
- mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
+ mm.releaseExecutionMemory(maxMemory, taskAttemptId, memoryMode)
assert(mm.executionMemoryUsed === 0L)
}
test("basic storage memory") {
val maxMemory = 1000L
val (mm, ms) = makeThings(maxMemory)
+ val memoryMode = MemoryMode.ON_HEAP
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 10L))
+ assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
- assert(mm.acquireStorageMemory(dummyBlock, 100L))
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L))
+ assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU eviction
- assert(mm.acquireStorageMemory(dummyBlock, maxMemory))
+ assert(mm.acquireStorageMemory(dummyBlock, maxMemory, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
assert(evictedBlocks.nonEmpty)
evictedBlocks.clear()
- assert(mm.acquireStorageMemory(dummyBlock, 1L))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
assertEvictBlocksToFreeSpaceCalled(ms, 1L)
assert(evictedBlocks.nonEmpty)
evictedBlocks.clear()
@@ -100,19 +102,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
// 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted
// the 1000-byte block entirely. This is set up differently so we can write finer-grained tests.
assert(mm.storageMemoryUsed === 1000L)
- mm.releaseStorageMemory(800L)
+ mm.releaseStorageMemory(800L, memoryMode)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireStorageMemory(dummyBlock, 1L))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L))
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
- mm.releaseStorageMemory(100L)
+ mm.releaseStorageMemory(100L, memoryMode)
assert(mm.storageMemoryUsed === 0L)
}
@@ -120,18 +122,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
+ val memoryMode = MemoryMode.ON_HEAP
// Acquire enough storage memory to exceed the storage region
- assert(mm.acquireStorageMemory(dummyBlock, 750L))
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// Execution needs to request 250 bytes to evict storage memory
- assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
assert(mm.executionMemoryUsed === 100L)
assert(mm.storageMemoryUsed === 750L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
// Execution wants 200 bytes but only 150 are free, so storage is evicted
- assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
+ assert(mm.acquireExecutionMemory(200L, taskAttemptId, memoryMode) === 200L)
assert(mm.executionMemoryUsed === 300L)
assert(mm.storageMemoryUsed === 700L)
assertEvictBlocksToFreeSpaceCalled(ms, 50L)
@@ -141,13 +144,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
require(mm.executionMemoryUsed === 300L)
require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
// Acquire some storage memory again, but this time keep it within the storage region
- assert(mm.acquireStorageMemory(dummyBlock, 400L))
+ assert(mm.acquireStorageMemory(dummyBlock, 400L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 400L)
assert(mm.executionMemoryUsed === 300L)
// Execution cannot evict storage because the latter is within the storage fraction,
// so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300
- assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
+ assert(mm.acquireExecutionMemory(400L, taskAttemptId, memoryMode) === 300L)
assert(mm.executionMemoryUsed === 600L)
assert(mm.storageMemoryUsed === 400L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
@@ -157,8 +160,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
+ val memoryMode = MemoryMode.ON_HEAP
// Acquire enough storage memory to exceed the storage region size
- assert(mm.acquireStorageMemory(dummyBlock, 700L))
+ assert(mm.acquireStorageMemory(dummyBlock, 700L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 700L)
@@ -166,7 +170,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
// mistakenly think that the 300 bytes of free space was still available even after
// using it to expand the execution pool. Consequently, no storage memory was released
// and the following call granted only 300 bytes to execution.
- assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
+ assert(mm.acquireExecutionMemory(500L, taskAttemptId, memoryMode) === 500L)
assertEvictBlocksToFreeSpaceCalled(ms, 200L)
assert(mm.storageMemoryUsed === 500L)
assert(mm.executionMemoryUsed === 500L)
@@ -177,34 +181,35 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
+ val memoryMode = MemoryMode.ON_HEAP
// Acquire enough execution memory to exceed the execution region
- assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
+ assert(mm.acquireExecutionMemory(800L, taskAttemptId, memoryMode) === 800L)
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 0L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should not be able to evict execution
- assert(mm.acquireStorageMemory(dummyBlock, 100L))
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(!mm.acquireStorageMemory(dummyBlock, 250L))
+ assert(!mm.acquireStorageMemory(dummyBlock, 250L, memoryMode))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
// Do not attempt to evict blocks, since evicting will not free enough memory:
assertEvictBlocksToFreeSpaceNotCalled(ms)
- mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
- mm.releaseStorageMemory(maxMemory)
+ mm.releaseExecutionMemory(maxMemory, taskAttemptId, memoryMode)
+ mm.releaseStorageMemory(maxMemory, memoryMode)
// Acquire some execution memory again, but this time keep it within the execution region
- assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
+ assert(mm.acquireExecutionMemory(200L, taskAttemptId, memoryMode) === 200L)
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 0L)
assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should still not be able to evict execution
- assert(mm.acquireStorageMemory(dummyBlock, 750L))
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, memoryMode))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free
- assert(!mm.acquireStorageMemory(dummyBlock, 850L))
+ assert(!mm.acquireStorageMemory(dummyBlock, 850L, memoryMode))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
// Do not attempt to evict blocks, since evicting will not free enough memory:
@@ -221,7 +226,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
.set("spark.testing.reservedMemory", reservedMemory.toString)
val mm = UnifiedMemoryManager(conf, numCores = 1)
val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong
- assert(mm.maxMemory === expectedMaxMemory)
+ assert(mm.maxHeapMemory === expectedMaxMemory)
// Try using a system memory that's too small
val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString)
@@ -256,18 +261,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
.set("spark.testing.memory", "1000")
val mm = UnifiedMemoryManager(conf, numCores = 2)
val ms = makeMemoryStore(mm)
- assert(mm.maxMemory === 1000)
+ val memoryMode = MemoryMode.ON_HEAP
+ assert(mm.maxHeapMemory === 1000)
// Have two tasks each acquire some execution memory so that the memory pool registers that
// there are two active tasks:
- assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
- assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, 0, memoryMode) === 100L)
+ assert(mm.acquireExecutionMemory(100L, 1, memoryMode) === 100L)
// Fill up all of the remaining memory with storage.
- assert(mm.acquireStorageMemory(dummyBlock, 800L))
+ assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 800)
assert(mm.executionMemoryUsed === 200)
// A task should still be able to allocate 100 bytes execution memory by evicting blocks
- assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, 0, memoryMode) === 100L)
assertEvictBlocksToFreeSpaceCalled(ms, 100L)
assert(mm.executionMemoryUsed === 300)
assert(mm.storageMemoryUsed === 700)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 7a4cb39b14..6fc32cb30a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.netty.NettyBlockTransferService
@@ -821,7 +821,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val memoryManager = new StaticMemoryManager(
conf,
maxOnHeapExecutionMemory = Long.MaxValue,
- maxStorageMemory = 1200,
+ maxOnHeapStorageMemory = 1200,
numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,