aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-05-11 12:58:57 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-11 12:58:57 -0700
commitbb88ad4e0e870c88d474c71939a19541522a3023 (patch)
tree23d1307530e5b20ba9ed26f2de42aaf4629ae8cc /core
parent81c68eceba3a857ba7349c6892dc336c3ebd11dc (diff)
downloadspark-bb88ad4e0e870c88d474c71939a19541522a3023.tar.gz
spark-bb88ad4e0e870c88d474c71939a19541522a3023.tar.bz2
spark-bb88ad4e0e870c88d474c71939a19541522a3023.zip
[SPARK-15260] Atomically resize memory pools
## What changes were proposed in this pull request? When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call `memoryStore.evictBlocksToFreeSpace`, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state. This patch minimizes the things we do between the two calls to make the resizing more atomic. ## How was this patch tested? Jenkins. Author: Andrew Or <andrew@databricks.com> Closes #13039 from andrewor14/safer-pool.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala23
4 files changed, 46 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 0b552cabfc..4c6b639015 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -116,13 +116,13 @@ private[memory] class StorageMemoryPool(
}
/**
- * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
- * of bytes removed from the pool's capacity.
+ * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
+ * Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
+ *
+ * @return number of bytes to be removed from the pool's capacity.
*/
- def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
- // First, shrink the pool by reclaiming free memory:
+ def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
- decrementPoolSize(spaceFreedByReleasingUnusedMemory)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
@@ -130,7 +130,6 @@ private[memory] class StorageMemoryPool(
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
- decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 82023b533d..ae747c1d16 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
- val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
+ val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
- executionPool.incrementPoolSize(spaceReclaimed)
+ storagePool.decrementPoolSize(spaceToReclaim)
+ executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index a1286523a2..2c4928ab90 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -79,6 +79,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
}
/**
+ * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
+ * stubbed to always throw [[RuntimeException]].
+ */
+ protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
+ val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
+ when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
+ override def answer(invocation: InvocationOnMock): Long = {
+ throw new RuntimeException("bad memory store!")
+ }
+ })
+ mm.setMemoryStore(ms)
+ ms
+ }
+
+ /**
* Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
*
* This is a significant simplification of the real method, which actually drops existing
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 14255818c7..c821054412 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -280,4 +280,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(evictedBlocks.nonEmpty)
}
+ test("SPARK-15260: atomically resize memory pools") {
+ val conf = new SparkConf()
+ .set("spark.memory.fraction", "1")
+ .set("spark.memory.storageFraction", "0")
+ .set("spark.testing.memory", "1000")
+ val mm = UnifiedMemoryManager(conf, numCores = 2)
+ makeBadMemoryStore(mm)
+ val memoryMode = MemoryMode.ON_HEAP
+ // Acquire 1000 then release 600 bytes of storage memory, leaving the
+ // storage memory pool at 1000 bytes but only 400 bytes of which are used.
+ assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode))
+ mm.releaseStorageMemory(600L, memoryMode)
+ // Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of
+ // unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool
+ // by the same amount. If the eviction threw an exception, then we would shrink one pool
+ // without enlarging the other, resulting in an assertion failure.
+ intercept[RuntimeException] {
+ mm.acquireExecutionMemory(1000L, 0, memoryMode)
+ }
+ val assertInvariants = PrivateMethod[Unit]('assertInvariants)
+ mm.invokePrivate[Unit](assertInvariants())
+ }
+
}