diff options
author | Andrew Or <andrew@databricks.com> | 2016-05-11 12:58:57 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-05-11 12:58:57 -0700 |
commit | bb88ad4e0e870c88d474c71939a19541522a3023 (patch) | |
tree | 23d1307530e5b20ba9ed26f2de42aaf4629ae8cc /core/src | |
parent | 81c68eceba3a857ba7349c6892dc336c3ebd11dc (diff) | |
download | spark-bb88ad4e0e870c88d474c71939a19541522a3023.tar.gz spark-bb88ad4e0e870c88d474c71939a19541522a3023.tar.bz2 spark-bb88ad4e0e870c88d474c71939a19541522a3023.zip |
[SPARK-15260] Atomically resize memory pools
## What changes were proposed in this pull request?
When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call `memoryStore.evictBlocksToFreeSpace`, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state.
This patch minimizes the things we do between the two calls to make the resizing more atomic.
## How was this patch tested?
Jenkins.
Author: Andrew Or <andrew@databricks.com>
Closes #13039 from andrewor14/safer-pool.
Diffstat (limited to 'core/src')
4 files changed, 46 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 0b552cabfc..4c6b639015 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -116,13 +116,13 @@ private[memory] class StorageMemoryPool( } /** - * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number - * of bytes removed from the pool's capacity. + * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. + * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. + * + * @return number of bytes to be removed from the pool's capacity. */ - def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { - // First, shrink the pool by reclaiming free memory: + def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) - decrementPoolSize(spaceFreedByReleasingUnusedMemory) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: @@ -130,7 +130,6 @@ private[memory] class StorageMemoryPool( memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. - decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 82023b533d..ae747c1d16 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storagePool.shrinkPoolToFreeSpace( + val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) - executionPool.incrementPoolSize(spaceReclaimed) + storagePool.decrementPoolSize(spaceToReclaim) + executionPool.incrementPoolSize(spaceToReclaim) } } } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index a1286523a2..2c4928ab90 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -79,6 +79,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft } /** + * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is + * stubbed to always throw [[RuntimeException]]. + */ + protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = { + val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) + when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] { + override def answer(invocation: InvocationOnMock): Long = { + throw new RuntimeException("bad memory store!") + } + }) + mm.setMemoryStore(ms) + ms + } + + /** * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory. * * This is a significant simplification of the real method, which actually drops existing diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 14255818c7..c821054412 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -280,4 +280,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.nonEmpty) } + test("SPARK-15260: atomically resize memory pools") { + val conf = new SparkConf() + .set("spark.memory.fraction", "1") + .set("spark.memory.storageFraction", "0") + .set("spark.testing.memory", "1000") + val mm = UnifiedMemoryManager(conf, numCores = 2) + makeBadMemoryStore(mm) + val memoryMode = MemoryMode.ON_HEAP + // Acquire 1000 then release 600 bytes of storage memory, leaving the + // storage memory pool at 1000 bytes but only 400 bytes of which are used. + assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode)) + mm.releaseStorageMemory(600L, memoryMode) + // Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of + // unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool + // by the same amount. If the eviction threw an exception, then we would shrink one pool + // without enlarging the other, resulting in an assertion failure. + intercept[RuntimeException] { + mm.acquireExecutionMemory(1000L, 0, memoryMode) + } + val assertInvariants = PrivateMethod[Unit]('assertInvariants) + mm.invokePrivate[Unit](assertInvariants()) + } + } |