aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-12-09 11:39:59 -0800
committerAndrew Or <andrew@databricks.com>2015-12-09 11:39:59 -0800
commitaec5ea000ebb8921f42f006b694ef26f5df67d83 (patch)
treef2d25a92e6a11d1970027cbd5f8f08ad408dfbe9 /core
parent442a7715a590ba2ea2446c73b1f914a16ae0ed4b (diff)
downloadspark-aec5ea000ebb8921f42f006b694ef26f5df67d83.tar.gz
spark-aec5ea000ebb8921f42f006b694ef26f5df67d83.tar.bz2
spark-aec5ea000ebb8921f42f006b694ef26f5df67d83.zip
[SPARK-12165][SPARK-12189] Fix bugs in eviction of storage memory by execution
This patch fixes a bug in the eviction of storage memory by execution. ## The bug: In general, execution should be able to evict storage memory when the total storage memory usage is greater than `maxMemory * spark.memory.storageFraction`. Due to a bug, however, Spark might wind up evicting no storage memory in certain cases where the storage memory usage was between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For example, here is a regression test which illustrates the bug: ```scala val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Since we used the default storage fraction (0.5), we should be able to allocate 500 bytes // of storage memory which are immune to eviction by execution memory pressure. // Acquire enough storage memory to exceed the storage region size assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) // At this point, storage is using 250 more bytes of memory than it is guaranteed, so execution // should be able to reclaim up to 250 bytes of storage memory. // Therefore, execution should now be able to require up to 500 bytes of memory: assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L assert(mm.storageMemoryUsed === 500L) assert(mm.executionMemoryUsed === 500L) assertEvictBlocksToFreeSpaceCalled(ms, 250L) ``` The problem relates to the control flow / interaction between `StorageMemoryPool.shrinkPoolToReclaimSpace()` and `MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of execution memory, the `UnifiedMemoryManager` discovers that it will need to reclaim 250 bytes of memory from storage, so it calls `StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls `MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks whether the requested space is less than `maxStorageMemory - storageMemoryUsed`, which will be true if there is any free execution memory because it turns out that `MemoryStore.maxStorageMemory = (maxMemory - onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used. The control flow here is somewhat confusing (it grew to be messy / confusing over time / as a result of the merging / refactoring of several components). In the pre-Spark 1.6 code, `ensureFreeSpace` was called directly by the `MemoryStore` itself, whereas in 1.6 it's involved in a confusing control flow where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, which then calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls `MemoryManager.freeStorageMemory`. ## The solution: The solution implemented in this patch is to remove the confusing circular control flow between `MemoryManager` and `MemoryStore`, making the storage memory acquisition process much more linear / straightforward. The key changes: - Remove a layer of inheritance which made the memory manager code harder to understand (53841174760a24a0df3eb1562af1f33dbe340eb9). - Move some bounds checks earlier in the call chain (13ba7ada77f87ef1ec362aec35c89a924e6987cb). - Refactor `ensureFreeSpace()` so that the part which evicts blocks can be called independently from the part which checks whether there is enough free space to avoid eviction (7c68ca09cb1b12f157400866983f753ac863380e). - Realize that this lets us remove a layer of overloads from `ensureFreeSpace` (eec4f6c87423d5e482b710e098486b3bbc4daf06). - Realize that `ensureFreeSpace()` can simply be replaced with an `evictBlocksToFreeSpace()` method which is called [after we've already figured out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88) how much memory needs to be reclaimed via eviction; (2dc842aea82c8895125d46a00aa43dfb0d121de9). Along the way, I fixed some problems with the mocks in `MemoryManagerSuite`: the old mocks would [unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84) report that a block had been evicted even if there was enough space in the storage pool such that eviction would be avoided. I also fixed a problem where `StorageMemoryPool._memoryUsed` might become negative due to freed memory being double-counted when excution evicts storage. The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement `_memoryUsed`](https://github.com/apache/spark/commit/7c68ca09cb1b12f157400866983f753ac863380e#diff-935c68a9803be144ed7bafdd2f756a0fL133) even though `StorageMemoryPool.freeMemory` had already decremented it as each evicted block was freed. See SPARK-12189 for details. Author: Josh Rosen <joshrosen@databricks.com> Author: Andrew Or <andrew@databricks.com> Closes #10170 from JoshRosen/SPARK-12165.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala76
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala137
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala76
8 files changed, 230 insertions, 204 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index ceb8ea434e..ae9e1ac0e2 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -77,9 +77,7 @@ private[spark] abstract class MemoryManager(
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
- }
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -109,12 +107,7 @@ private[spark] abstract class MemoryManager(
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
- memoryMode: MemoryMode): Long = synchronized {
- memoryMode match {
- case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
- case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
- }
- }
+ memoryMode: MemoryMode): Long
/**
* Release numBytes of execution memory belonging to the given task.
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 12a0943068..3554b558f2 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -49,19 +49,50 @@ private[spark] class StaticMemoryManager(
}
// Max number of bytes worth of blocks to evict when unrolling
- private val maxMemoryToEvictForUnroll: Long = {
+ private val maxUnrollMemory: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ if (numBytes > maxStorageMemory) {
+ // Fail fast if the block simply won't fit
+ logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
+ s"memory limit ($maxStorageMemory bytes)")
+ false
+ } else {
+ storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+ }
+ }
+
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
- val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
- val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
+ val freeMemory = storageMemoryPool.memoryFree
+ // When unrolling, we will use all of the existing free memory, and, if necessary,
+ // some extra space freed from evicting cached blocks. We must place a cap on the
+ // amount of memory to be evicted by unrolling, however, otherwise unrolling one
+ // big block can blow away the entire cache.
+ val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
+ // Keep it within the range 0 <= X <= maxNumBytesToFree
+ val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
+
+ private[memory]
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index fc4f0357e9..70af83b5ee 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -65,7 +65,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
- acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
+ val numBytesToFree = math.max(0, numBytes - memoryFree)
+ acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
/**
@@ -73,7 +74,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
*
* @param blockId the ID of the block we are acquiring storage memory for
* @param numBytesToAcquire the size of this block
- * @param numBytesToFree the size of space to be freed through evicting blocks
+ * @param numBytesToFree the amount of space to be freed through evicting blocks
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(
@@ -84,16 +85,18 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
- memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
- // Register evicted blocks, if any, with the active task metrics
- Option(TaskContext.get()).foreach { tc =>
- val metrics = tc.taskMetrics()
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ if (numBytesToFree > 0) {
+ memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
+ // Register evicted blocks, if any, with the active task metrics
+ Option(TaskContext.get()).foreach { tc =>
+ val metrics = tc.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ }
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
- // back into this StorageMemoryPool in order to free. Therefore, these variables should have
- // been updated.
+ // back into this StorageMemoryPool in order to free memory. Therefore, these variables
+ // should have been updated.
val enoughMemory = numBytesToAcquire <= memoryFree
if (enoughMemory) {
_memoryUsed += numBytesToAcquire
@@ -121,18 +124,20 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
*/
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
// First, shrink the pool by reclaiming free memory:
- val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
+ val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
- if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
- spaceFreedByReleasingUnusedMemory
- } else {
+ val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
+ if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
+ memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks)
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
- _memoryUsed -= spaceFreedByEviction
+ // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
+ // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
+ } else {
+ spaceFreedByReleasingUnusedMemory
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0f1ea9ab39..0b9f6a9dc0 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -100,7 +100,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
case MemoryMode.OFF_HEAP =>
// For now, we only support on-heap caching of data, so we do not need to interact with
// the storage pool when allocating off-heap memory. This will change in the future, though.
- super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
+ offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
@@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
+ if (numBytes > maxStorageMemory) {
+ // Fail fast if the block simply won't fit
+ logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
+ s"memory limit ($maxStorageMemory bytes)")
+ return false
+ }
if (numBytes > storageMemoryPool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 4dbac388e0..bdab8c2332 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -406,85 +406,41 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
/**
- * Try to free up a given amount of space by evicting existing blocks.
- *
- * @param space the amount of memory to free, in bytes
- * @param droppedBlocks a holder for blocks evicted in the process
- * @return whether the requested free space is freed.
- */
- private[spark] def ensureFreeSpace(
- space: Long,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- ensureFreeSpace(None, space, droppedBlocks)
- }
-
- /**
- * Try to free up a given amount of space to store a block by evicting existing ones.
- *
- * @param space the amount of memory to free, in bytes
- * @param droppedBlocks a holder for blocks evicted in the process
- * @return whether the requested free space is freed.
- */
- private[spark] def ensureFreeSpace(
- blockId: BlockId,
- space: Long,
- droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- ensureFreeSpace(Some(blockId), space, droppedBlocks)
- }
-
- /**
- * Try to free up a given amount of space to store a particular block, but can fail if
- * either the block is bigger than our memory or it would require replacing another block
- * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
- * don't fit into memory that we want to avoid).
- *
- * @param blockId the ID of the block we are freeing space for, if any
- * @param space the size of this block
- * @param droppedBlocks a holder for blocks evicted in the process
- * @return whether the requested free space is freed.
- */
- private def ensureFreeSpace(
+ * Try to evict blocks to free up a given amount of space to store a particular block.
+ * Can fail if either the block is bigger than our memory or it would require replacing
+ * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
+ * RDDs that don't fit into memory that we want to avoid).
+ *
+ * @param blockId the ID of the block we are freeing space for, if any
+ * @param space the size of this block
+ * @param droppedBlocks a holder for blocks evicted in the process
+ * @return whether the requested free space is freed.
+ */
+ private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ assert(space > 0)
memoryManager.synchronized {
- val freeMemory = maxMemory - memoryUsed
+ var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
- var selectedMemory = 0L
-
- logInfo(s"Ensuring $space bytes of free space " +
- blockId.map { id => s"for block $id" }.getOrElse("") +
- s"(free: $freeMemory, max: $maxMemory)")
-
- // Fail fast if the block simply won't fit
- if (space > maxMemory) {
- logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") +
- s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)")
- return false
- }
-
- // No need to evict anything if there is already enough free space
- if (freeMemory >= space) {
- return true
- }
-
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
- while (freeMemory + selectedMemory < space && iterator.hasNext) {
+ while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
- selectedMemory += pair.getValue.size
+ freedMemory += pair.getValue.size
}
}
}
- if (freeMemory + selectedMemory >= space) {
+ if (freedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index f55d435fa3..555b640cb4 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -24,9 +24,10 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import org.mockito.Matchers.{any, anyLong}
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
@@ -36,105 +37,105 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel
/**
* Helper trait for sharing code among [[MemoryManager]] tests.
*/
-private[memory] trait MemoryManagerSuite extends SparkFunSuite {
+private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
- import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
+ protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]
+
+ import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED
// Note: Mockito's verify mechanism does not provide a way to reset method call counts
// without also resetting stubbed methods. Since our test code relies on the latter,
- // we need to use our own variable to track invocations of `ensureFreeSpace`.
+ // we need to use our own variable to track invocations of `evictBlocksToFreeSpace`.
/**
- * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
+ * The amount of space requested in the last call to [[MemoryStore.evictBlocksToFreeSpace]].
*
- * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
- * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
+ * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and cleared when the test
+ * code makes explicit assertions on this variable through
+ * [[assertEvictBlocksToFreeSpaceCalled]].
*/
- private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+ private val evictBlocksToFreeSpaceCalled = new AtomicLong(0)
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ evictedBlocks.clear()
+ evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
+ }
/**
- * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed.
+ * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is stubbed.
*
- * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]]
- * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its
- * dependencies.
+ * This allows our test code to release storage memory when these methods are called
+ * without relying on [[org.apache.spark.storage.BlockManager]] and all of its dependencies.
*/
protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
- val ms = mock(classOf[MemoryStore])
- when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
- when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
+ val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
+ when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
+ .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
mm.setMemoryStore(ms)
ms
}
/**
- * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments.
- */
- private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
+ * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
+ *
+ * This is a significant simplification of the real method, which actually drops existing
+ * blocks based on the size of each block. Instead, here we simply release as many bytes
+ * as needed to ensure the requested amount of free space. This allows us to set up the
+ * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
+ * many other dependencies.
+ *
+ * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that
+ * records the number of bytes this is called with. This variable is expected to be cleared
+ * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
+ */
+ private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = {
new Answer[Boolean] {
override def answer(invocation: InvocationOnMock): Boolean = {
val args = invocation.getArguments
- require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " +
- s"in ensureFreeSpace, found ${args.size}")
- require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
- s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
- val numBytes = args(numBytesPos).asInstanceOf[Long]
- val success = mockEnsureFreeSpace(mm, numBytes)
- if (success) {
+ val numBytesToFree = args(1).asInstanceOf[Long]
+ assert(numBytesToFree > 0)
+ require(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
+ "bad test: evictBlocksToFreeSpace() variable was not reset")
+ evictBlocksToFreeSpaceCalled.set(numBytesToFree)
+ if (numBytesToFree <= mm.storageMemoryUsed) {
+ // We can evict enough blocks to fulfill the request for space
+ mm.releaseStorageMemory(numBytesToFree)
args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
- (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
+ // We need to add this call so that that the suite-level `evictedBlocks` is updated when
+ // execution evicts storage; in that case, args.last will not be equal to evictedBlocks
+ // because it will be a temporary buffer created inside of the MemoryManager rather than
+ // being passed in by the test code.
+ if (!(evictedBlocks eq args.last)) {
+ evictedBlocks.append(
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
+ }
+ true
+ } else {
+ // No blocks were evicted because eviction would not free enough space.
+ false
}
- success
- }
- }
- }
-
- /**
- * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory.
- *
- * This is a significant simplification of the real method, which actually drops existing
- * blocks based on the size of each block. Instead, here we simply release as many bytes
- * as needed to ensure the requested amount of free space. This allows us to set up the
- * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
- * many other dependencies.
- *
- * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
- * records the number of bytes this is called with. This variable is expected to be cleared
- * by the test code later through [[assertEnsureFreeSpaceCalled]].
- */
- private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean = mm.synchronized {
- require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
- "bad test: ensure free space variable was not reset")
- // Record the number of bytes we freed this call
- ensureFreeSpaceCalled.set(numBytes)
- if (numBytes <= mm.maxStorageMemory) {
- def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
- val spaceToRelease = numBytes - freeMemory
- if (spaceToRelease > 0) {
- mm.releaseStorageMemory(spaceToRelease)
}
- freeMemory >= numBytes
- } else {
- // We attempted to free more bytes than our max allowable memory
- false
}
}
/**
- * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
+ * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is called with the given parameters.
*/
- protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = {
- assert(ensureFreeSpaceCalled.get() === numBytes,
- s"expected ensure free space to be called with $numBytes")
- ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+ protected def assertEvictBlocksToFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = {
+ assert(evictBlocksToFreeSpaceCalled.get() === numBytes,
+ s"expected evictBlocksToFreeSpace() to be called with $numBytes")
+ evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
}
/**
- * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
+ * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is NOT called.
*/
- protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
- assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
- "ensure free space should not have been called!")
+ protected def assertEvictBlocksToFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
+ assert(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
+ "evictBlocksToFreeSpace() should not have been called!")
+ assert(evictedBlocks.isEmpty)
}
/**
@@ -291,5 +292,5 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
}
private object MemoryManagerSuite {
- private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
+ private val DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED = -1L
}
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 54cb28c389..6700b94f0f 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -17,16 +17,13 @@
package org.apache.spark.memory
-import scala.collection.mutable.ArrayBuffer
-
import org.mockito.Mockito.when
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
+import org.apache.spark.storage.{MemoryStore, TestBlockId}
class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
- private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
/**
* Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
@@ -85,33 +82,38 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
- // `ensureFreeSpace` should be called with the number of bytes requested
- assertEnsureFreeSpaceCalled(ms, 10L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
+
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU eviction
assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1000L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 1L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
+ // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at
+ // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted
+ // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests.
assert(mm.storageMemoryUsed === 1000L)
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
mm.releaseStorageMemory(100L)
@@ -133,7 +135,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 50L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
@@ -151,21 +153,25 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
val dummyBlock = TestBlockId("lonely water")
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 100L)
+ when(ms.currentUnrollMemory).thenReturn(100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 100L)
mm.releaseUnrollMemory(40L)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
+ assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ assert(mm.storageMemoryUsed === 860L)
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
- // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
- assertEnsureFreeSpaceCalled(ms, 340L)
- assert(mm.storageMemoryUsed === 560L)
- when(ms.currentUnrollMemory).thenReturn(560L)
- assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
- assert(mm.storageMemoryUsed === 560L)
- // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed
- assertEnsureFreeSpaceCalled(ms, 0L)
+ // Since we already occupy 60 bytes, we will try to evict only 400 - 60 = 340 bytes.
+ assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+ when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
+ assert(mm.storageMemoryUsed === 1000L)
+ evictedBlocks.clear()
+ assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 400 - 300
+ assert(mm.storageMemoryUsed === 900L) // 100 bytes were evicted
// Release beyond what was acquired
mm.releaseUnrollMemory(maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e97c898a44..71221deeb4 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -17,16 +17,13 @@
package org.apache.spark.memory
-import scala.collection.mutable.ArrayBuffer
-
import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
-import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
+import org.apache.spark.storage.{MemoryStore, TestBlockId}
class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
private val dummyBlock = TestBlockId("--")
- private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
private val storageFraction: Double = 0.5
@@ -78,33 +75,40 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val (mm, ms) = makeThings(maxMemory)
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
- // `ensureFreeSpace` should be called with the number of bytes requested
- assertEnsureFreeSpaceCalled(ms, 10L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 10L)
+
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire more than the max, not granted
assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 110L)
// Acquire up to the max, requests after this are still granted due to LRU eviction
assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1000L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 110L)
assert(mm.storageMemoryUsed === 1000L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 1L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
+ // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at
+ // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted
+ // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests.
assert(mm.storageMemoryUsed === 1000L)
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 201L)
mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 1L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
mm.releaseStorageMemory(100L)
@@ -117,25 +121,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val (mm, ms) = makeThings(maxMemory)
// Acquire enough storage memory to exceed the storage region
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 750L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// Execution needs to request 250 bytes to evict storage memory
assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.executionMemoryUsed === 100L)
assert(mm.storageMemoryUsed === 750L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
// Execution wants 200 bytes but only 150 are free, so storage is evicted
assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
assert(mm.executionMemoryUsed === 300L)
- assertEnsureFreeSpaceCalled(ms, 50L)
- assert(mm.executionMemoryUsed === 300L)
+ assert(mm.storageMemoryUsed === 700L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 50L)
+ assert(evictedBlocks.nonEmpty)
+ evictedBlocks.clear()
mm.releaseAllStorageMemory()
require(mm.executionMemoryUsed === 300L)
require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
// Acquire some storage memory again, but this time keep it within the storage region
assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, 400L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.storageMemoryUsed === 400L)
assert(mm.executionMemoryUsed === 300L)
// Execution cannot evict storage because the latter is within the storage fraction,
@@ -143,7 +149,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
assert(mm.executionMemoryUsed === 600L)
assert(mm.storageMemoryUsed === 400L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ }
+
+ test("execution memory requests smaller than free memory should evict storage (SPARK-12165)") {
+ val maxMemory = 1000L
+ val taskAttemptId = 0L
+ val (mm, ms) = makeThings(maxMemory)
+ // Acquire enough storage memory to exceed the storage region size
+ assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.storageMemoryUsed === 700L)
+ // SPARK-12165: previously, MemoryStore would not evict anything because it would
+ // mistakenly think that the 300 bytes of free space was still available even after
+ // using it to expand the execution pool. Consequently, no storage memory was released
+ // and the following call granted only 300 bytes to execution.
+ assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 200L)
+ assert(mm.storageMemoryUsed === 500L)
+ assert(mm.executionMemoryUsed === 500L)
+ assert(evictedBlocks.nonEmpty)
}
test("storage does not evict execution") {
@@ -154,32 +180,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 0L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
- assertEnsureFreeSpaceCalled(ms, 100L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
- assertEnsureFreeSpaceCalled(ms, 250L)
+ // Do not attempt to evict blocks, since evicting will not free enough memory:
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
mm.releaseStorageMemory(maxMemory)
// Acquire some execution memory again, but this time keep it within the execution region
assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 0L)
- assertEnsureFreeSpaceNotCalled(ms)
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
// Storage should still not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
- assertEnsureFreeSpaceCalled(ms, 750L)
+ assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free
assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 750L)
- assertEnsureFreeSpaceCalled(ms, 850L)
+ // Do not attempt to evict blocks, since evicting will not free enough memory:
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
}
test("small heap") {