diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-26 11:03:25 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-26 11:03:25 -0700 |
commit | 20c0bcd972cfbb2f2aa92948f9ee337724a70361 (patch) | |
tree | 96ba2aa2989a1243062b0ae91a62b86e15acd365 /core/src/main/scala/org/apache | |
parent | bd94ea4c80f4fc18f4000346d7c6717539846efb (diff) | |
download | spark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.tar.gz spark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.tar.bz2 spark-20c0bcd972cfbb2f2aa92948f9ee337724a70361.zip |
[SPARK-14135] Add off-heap storage memory bookkeeping support to MemoryManager
This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap.
In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation).
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11942 from JoshRosen/off-heap-storage-memory-bookkeeping.
Diffstat (limited to 'core/src/main/scala/org/apache')
8 files changed, 192 insertions, 114 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala index 319718edb5..f8167074c6 100644 --- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala @@ -37,13 +37,18 @@ import org.apache.spark.internal.Logging * tasks was performed by the ShuffleMemoryManager. * * @param lock a [[MemoryManager]] instance to synchronize on - * @param poolName a human-readable name for this pool, for use in log messages + * @param memoryMode the type of memory tracked by this pool (on- or off-heap) */ private[memory] class ExecutionMemoryPool( lock: Object, - poolName: String + memoryMode: MemoryMode ) extends MemoryPool(lock) with Logging { + private[this] val poolName: String = memoryMode match { + case MemoryMode.ON_HEAP => "on-heap execution" + case MemoryMode.OFF_HEAP => "off-heap execution" + } + /** * Map from taskAttemptId -> memory consumption in bytes */ diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 5e8abeecea..10656bc8c8 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -36,42 +36,52 @@ import org.apache.spark.unsafe.memory.MemoryAllocator private[spark] abstract class MemoryManager( conf: SparkConf, numCores: Int, - storageMemory: Long, + onHeapStorageMemory: Long, onHeapExecutionMemory: Long) extends Logging { // -- Methods related to memory allocation policies and bookkeeping ------------------------------ @GuardedBy("this") - protected val storageMemoryPool = new StorageMemoryPool(this) + protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) @GuardedBy("this") - protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution") + protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) @GuardedBy("this") - protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution") + protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) + @GuardedBy("this") + protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP) - storageMemoryPool.incrementPoolSize(storageMemory) + onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) - offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0)) + + protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) + protected[this] val offHeapStorageMemory = + (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong + + offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) + offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory) /** * Total available memory for storage, in bytes. This amount can vary over time, depending on * the MemoryManager implementation. * In this model, this is equivalent to the amount of memory not occupied by execution. */ - def maxStorageMemory: Long + def maxOnHeapStorageMemory: Long /** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints. */ final def setMemoryStore(store: MemoryStore): Unit = synchronized { - storageMemoryPool.setMemoryStore(store) + onHeapStorageMemoryPool.setMemoryStore(store) + offHeapStorageMemoryPool.setMemoryStore(store) } /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. + * * @return whether all N bytes were successfully granted. */ - def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean + def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. @@ -82,7 +92,7 @@ private[spark] abstract class MemoryManager( * * @return whether all N bytes were successfully granted. */ - def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean + def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean /** * Try to acquire up to `numBytes` of execution memory for the current task and return the @@ -126,22 +136,26 @@ private[spark] abstract class MemoryManager( /** * Release N bytes of storage memory. */ - def releaseStorageMemory(numBytes: Long): Unit = synchronized { - storageMemoryPool.releaseMemory(numBytes) + def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { + memoryMode match { + case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes) + case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes) + } } /** * Release all storage memory acquired. */ final def releaseAllStorageMemory(): Unit = synchronized { - storageMemoryPool.releaseAllMemory() + onHeapStorageMemoryPool.releaseAllMemory() + offHeapStorageMemoryPool.releaseAllMemory() } /** * Release N bytes of unroll memory. */ - final def releaseUnrollMemory(numBytes: Long): Unit = synchronized { - releaseStorageMemory(numBytes) + final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized { + releaseStorageMemory(numBytes, memoryMode) } /** @@ -155,7 +169,7 @@ private[spark] abstract class MemoryManager( * Storage memory currently in use, in bytes. */ final def storageMemoryUsed: Long = synchronized { - storageMemoryPool.memoryUsed + onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } /** diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index f9f8f820bc..cbd0fa9ec2 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -30,12 +30,12 @@ import org.apache.spark.storage.BlockId private[spark] class StaticMemoryManager( conf: SparkConf, maxOnHeapExecutionMemory: Long, - override val maxStorageMemory: Long, + override val maxOnHeapStorageMemory: Long, numCores: Int) extends MemoryManager( conf, numCores, - maxStorageMemory, + maxOnHeapStorageMemory, maxOnHeapExecutionMemory) { def this(conf: SparkConf, numCores: Int) { @@ -46,25 +46,39 @@ private[spark] class StaticMemoryManager( numCores) } + // The StaticMemoryManager does not support off-heap storage memory: + offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize) + offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize) + // Max number of bytes worth of blocks to evict when unrolling private val maxUnrollMemory: Long = { - (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong + (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } - override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - if (numBytes > maxStorageMemory) { + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + require(memoryMode != MemoryMode.OFF_HEAP, + "StaticMemoryManager does not support off-heap storage memory") + if (numBytes > maxOnHeapStorageMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit ($maxStorageMemory bytes)") + s"memory limit ($maxOnHeapStorageMemory bytes)") false } else { - storageMemoryPool.acquireMemory(blockId, numBytes) + onHeapStorageMemoryPool.acquireMemory(blockId, numBytes) } } - override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory - val freeMemory = storageMemoryPool.memoryFree + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + require(memoryMode != MemoryMode.OFF_HEAP, + "StaticMemoryManager does not support off-heap unroll memory") + val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory + val freeMemory = onHeapStorageMemoryPool.memoryFree // When unrolling, we will use all of the existing free memory, and, if necessary, // some extra space freed from evicting cached blocks. We must place a cap on the // amount of memory to be evicted by unrolling, however, otherwise unrolling one @@ -72,7 +86,7 @@ private[spark] class StaticMemoryManager( val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) // Keep it within the range 0 <= X <= maxNumBytesToFree val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) - storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) + onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) } private[memory] diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 6fcf26e3ec..a67e8da26b 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -28,8 +28,12 @@ import org.apache.spark.storage.memory.MemoryStore * (caching). * * @param lock a [[MemoryManager]] instance to synchronize on + * @param memoryMode the type of memory tracked by this pool (on- or off-heap) */ -private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { +private[memory] class StorageMemoryPool( + lock: Object, + memoryMode: MemoryMode + ) extends MemoryPool(lock) with Logging { @GuardedBy("lock") private[this] var _memoryUsed: Long = 0L @@ -79,7 +83,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) - if (numBytesToFree > 0) { + // Once we support off-heap caching, this will need to change: + if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call @@ -117,7 +122,14 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: - val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree) + val spaceFreedByEviction = { + // Once we support off-heap caching, this will need to change: + if (memoryMode == MemoryMode.ON_HEAP) { + memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree) + } else { + 0 + } + } // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. decrementPoolSize(spaceFreedByEviction) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 6c57c98ea5..fa9c021f70 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -39,27 +39,32 @@ import org.apache.spark.storage.BlockId * up most of the storage space, in which case the new blocks will be evicted immediately * according to their respective storage levels. * - * @param storageRegionSize Size of the storage region, in bytes. + * @param onHeapStorageRegionSize Size of the storage region, in bytes. * This region is not statically reserved; execution can borrow from * it if necessary. Cached blocks can be evicted only if actual * storage memory usage exceeds this region. */ private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, - val maxMemory: Long, - storageRegionSize: Long, + val maxHeapMemory: Long, + onHeapStorageRegionSize: Long, numCores: Int) extends MemoryManager( conf, numCores, - storageRegionSize, - maxMemory - storageRegionSize) { + onHeapStorageRegionSize, + maxHeapMemory - onHeapStorageRegionSize) { - // We always maintain this invariant: - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + private def assertInvariants(): Unit = { + assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory) + assert( + offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory) + } + + assertInvariants() - override def maxStorageMemory: Long = synchronized { - maxMemory - onHeapExecutionMemoryPool.memoryUsed + override def maxOnHeapStorageMemory: Long = synchronized { + maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed } /** @@ -75,83 +80,104 @@ private[spark] class UnifiedMemoryManager private[memory] ( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariants() assert(numBytes >= 0) - memoryMode match { - case MemoryMode.ON_HEAP => - - /** - * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. - * - * When acquiring memory for a task, the execution pool may need to make multiple - * attempts. Each attempt must be able to evict storage in case another task jumps in - * and caches a large block between the attempts. This is called once per attempt. - */ - def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { - if (extraMemoryNeeded > 0) { - // There is not enough free memory in the execution pool, so try to reclaim memory from - // storage. We can reclaim any free memory from the storage pool. If the storage pool - // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim - // the memory that storage has borrowed from execution. - val memoryReclaimableFromStorage = - math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize) - if (memoryReclaimableFromStorage > 0) { - // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace( - math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) - onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed) - } - } - } + val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { + case MemoryMode.ON_HEAP => ( + onHeapExecutionMemoryPool, + onHeapStorageMemoryPool, + onHeapStorageRegionSize, + maxHeapMemory) + case MemoryMode.OFF_HEAP => ( + offHeapExecutionMemoryPool, + offHeapStorageMemoryPool, + offHeapStorageMemory, + maxOffHeapMemory) + } - /** - * The size the execution pool would have after evicting storage memory. - * - * The execution memory pool divides this quantity among the active tasks evenly to cap - * the execution memory allocation for each task. It is important to keep this greater - * than the execution pool size, which doesn't take into account potential memory that - * could be freed by evicting storage. Otherwise we may hit SPARK-12155. - * - * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness - * in execution memory allocation across tasks, Otherwise, a task may occupy more than - * its fair share of execution memory, mistakenly thinking that other tasks can acquire - * the portion of storage memory that cannot be evicted. - */ - def computeMaxExecutionPoolSize(): Long = { - maxMemory - math.min(storageMemoryUsed, storageRegionSize) + /** + * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. + * + * When acquiring memory for a task, the execution pool may need to make multiple + * attempts. Each attempt must be able to evict storage in case another task jumps in + * and caches a large block between the attempts. This is called once per attempt. + */ + def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { + if (extraMemoryNeeded > 0) { + // There is not enough free memory in the execution pool, so try to reclaim memory from + // storage. We can reclaim any free memory from the storage pool. If the storage pool + // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim + // the memory that storage has borrowed from execution. + val memoryReclaimableFromStorage = math.max( + storagePool.memoryFree, + storagePool.poolSize - storageRegionSize) + if (memoryReclaimableFromStorage > 0) { + // Only reclaim as much space as is necessary and available: + val spaceReclaimed = storagePool.shrinkPoolToFreeSpace( + math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) + executionPool.incrementPoolSize(spaceReclaimed) } + } + } - onHeapExecutionMemoryPool.acquireMemory( - numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) - - case MemoryMode.OFF_HEAP => - // For now, we only support on-heap caching of data, so we do not need to interact with - // the storage pool when allocating off-heap memory. This will change in the future, though. - offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) + /** + * The size the execution pool would have after evicting storage memory. + * + * The execution memory pool divides this quantity among the active tasks evenly to cap + * the execution memory allocation for each task. It is important to keep this greater + * than the execution pool size, which doesn't take into account potential memory that + * could be freed by evicting storage. Otherwise we may hit SPARK-12155. + * + * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness + * in execution memory allocation across tasks, Otherwise, a task may occupy more than + * its fair share of execution memory, mistakenly thinking that other tasks can acquire + * the portion of storage memory that cannot be evicted. + */ + def computeMaxExecutionPoolSize(): Long = { + maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } + + executionPool.acquireMemory( + numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) } - override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + assertInvariants() assert(numBytes >= 0) - if (numBytes > maxStorageMemory) { + val (executionPool, storagePool, maxMemory) = memoryMode match { + case MemoryMode.ON_HEAP => ( + onHeapExecutionMemoryPool, + onHeapStorageMemoryPool, + maxOnHeapStorageMemory) + case MemoryMode.OFF_HEAP => ( + offHeapExecutionMemoryPool, + offHeapStorageMemoryPool, + maxOffHeapMemory) + } + if (numBytes > maxMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit ($maxStorageMemory bytes)") + s"memory limit ($maxMemory bytes)") return false } - if (numBytes > storageMemoryPool.memoryFree) { + if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. - val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes) - onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution) - storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution) + val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) + executionPool.decrementPoolSize(memoryBorrowedFromExecution) + storagePool.incrementPoolSize(memoryBorrowedFromExecution) } - storageMemoryPool.acquireMemory(blockId, numBytes) + storagePool.acquireMemory(blockId, numBytes) } - override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { - acquireStorageMemory(blockId, numBytes) + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + memoryMode: MemoryMode): Boolean = synchronized { + acquireStorageMemory(blockId, numBytes, memoryMode) } } @@ -167,8 +193,8 @@ object UnifiedMemoryManager { val maxMemory = getMaxMemory(conf) new UnifiedMemoryManager( conf, - maxMemory = maxMemory, - storageRegionSize = + maxHeapMemory = maxMemory, + onHeapStorageRegionSize = (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, numCores = numCores) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 30d2e23efd..0c7763f236 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging -import org.apache.spark.memory.MemoryManager +import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.network._ import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf @@ -94,7 +94,7 @@ private[spark] class BlockManager( // However, since we use this only for reporting and logging, what we actually want here is // the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need // to revisit whether reporting this value as the "max" is intuitive to the user. - private val maxMemory = memoryManager.maxStorageMemory + private val maxMemory = memoryManager.maxOnHeapStorageMemory // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 38e9534251..7d23295e25 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -21,6 +21,7 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import java.util.concurrent.ConcurrentHashMap import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.memory.MemoryMode import org.apache.spark.util.Utils /** @@ -65,6 +66,11 @@ class StorageLevel private( require(replication == 1, "Off-heap storage level does not support multiple replication") } + private[spark] def memoryMode: MemoryMode = { + if (useOffHeap) MemoryMode.OFF_HEAP + else MemoryMode.ON_HEAP + } + override def clone(): StorageLevel = { new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication) } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 1a78c9c010..3ca41f32c1 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.memory.MemoryManager +import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} @@ -93,7 +93,7 @@ private[spark] class MemoryStore( conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) /** Total amount of memory available for storage, in bytes. */ - private def maxMemory: Long = memoryManager.maxStorageMemory + private def maxMemory: Long = memoryManager.maxOnHeapStorageMemory if (maxMemory < unrollMemoryThreshold) { logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + @@ -133,7 +133,7 @@ private[spark] class MemoryStore( size: Long, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - if (memoryManager.acquireStorageMemory(blockId, size)) { + if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) @@ -229,7 +229,7 @@ private[spark] class MemoryStore( // Synchronize so that transfer is atomic memoryManager.synchronized { releaseUnrollMemoryForThisTask(amount) - val success = memoryManager.acquireStorageMemory(blockId, amount) + val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } } @@ -237,7 +237,8 @@ private[spark] class MemoryStore( val enoughStorageMemory = { if (unrollMemoryUsedByThisBlock <= size) { val acquiredExtra = - memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) + memoryManager.acquireStorageMemory( + blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) if (acquiredExtra) { transferUnrollToStorage(unrollMemoryUsedByThisBlock) } @@ -353,7 +354,7 @@ private[spark] class MemoryStore( // Synchronize so that transfer is atomic memoryManager.synchronized { releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock) - val success = memoryManager.acquireStorageMemory(blockId, entry.size) + val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } entries.synchronized { @@ -406,7 +407,7 @@ private[spark] class MemoryStore( entries.remove(blockId) } if (entry != null) { - memoryManager.releaseStorageMemory(entry.size) + memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP) logInfo(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") true @@ -531,7 +532,7 @@ private[spark] class MemoryStore( */ def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = { memoryManager.synchronized { - val success = memoryManager.acquireUnrollMemory(blockId, memory) + val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP) if (success) { val taskAttemptId = currentTaskAttemptId() unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory @@ -554,7 +555,7 @@ private[spark] class MemoryStore( if (unrollMemoryMap(taskAttemptId) == 0) { unrollMemoryMap.remove(taskAttemptId) } - memoryManager.releaseUnrollMemory(memoryToRelease) + memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP) } } } |