aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-12-10 15:30:08 -0800
committerAndrew Or <andrew@databricks.com>2015-12-10 15:30:08 -0800
commit5030923ea8bb94ac8fa8e432de9fc7089aa93986 (patch)
treec8862be47d526f2791d3e042fcffcd871093aebb
parent23a9e62bad9669e9ff5dc4bd714f58d12f9be0b5 (diff)
downloadspark-5030923ea8bb94ac8fa8e432de9fc7089aa93986.tar.gz
spark-5030923ea8bb94ac8fa8e432de9fc7089aa93986.tar.bz2
spark-5030923ea8bb94ac8fa8e432de9fc7089aa93986.zip
[SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management
**Problem.** In unified memory management, acquiring execution memory may lead to eviction of storage memory. However, the space freed from evicting cached blocks is distributed among all active tasks. Thus, an incorrect upper bound on the execution memory per task can cause the acquisition to fail, leading to OOM's and premature spills. **Example.** Suppose total memory is 1000B, cached blocks occupy 900B, `spark.memory.storageFraction` is 0.4, and there are two active tasks. In this case, the cap on task execution memory is 100B / 2 = 50B. If task A tries to acquire 200B, it will evict 100B of storage but can only acquire 50B because of the incorrect cap. For another example, see this [regression test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233) that I stole from JoshRosen. **Solution.** Fix the cap on task execution memory. It should take into account the space that could have been freed by storage in addition to the current amount of memory available to execution. In the example above, the correct cap should have been 600B / 2 = 300B. This patch also guards against the race condition (SPARK-12253): (1) Existing tasks collectively occupy all execution memory (2) New task comes in and blocks while existing tasks spill (3) After tasks finish spilling, another task jumps in and puts in a large block, stealing the freed memory (4) New task still cannot acquire memory and goes back to sleep Author: Andrew Or <andrew@databricks.com> Closes #10240 from andrewor14/fix-oom.
-rw-r--r--core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala25
4 files changed, 114 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 9023e1ac01..dbb0ad8d5c 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
*
+ * @param numBytes number of bytes to acquire
+ * @param taskAttemptId the task attempt acquiring memory
+ * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
+ * one parameter (Long) that represents the desired amount of memory by
+ * which this pool should be expanded.
+ * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
+ * at this given moment. This is not a field because the max pool
+ * size is variable in certain cases. For instance, in unified
+ * memory management, the execution pool can be expanded by evicting
+ * cached blocks, thereby shrinking the storage pool.
+ *
* @return the number of bytes granted to the task.
*/
- def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
+ private[memory] def acquireMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
+ computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
+ // TODO: clean up this clunky method signature
+
// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {
@@ -91,25 +108,31 @@ private[memory] class ExecutionMemoryPool(
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
- // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
- // don't let it be negative
- val maxToGrant =
- math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+ // In every iteration of this loop, we should first try to reclaim any borrowed execution
+ // space from storage. This is necessary because of the potential race condition where new
+ // storage blocks may steal the free execution memory that this task was waiting for.
+ maybeGrowPool(numBytes - memoryFree)
+
+ // Maximum size the pool would have after potentially growing the pool.
+ // This is used to compute the upper bound of how much memory each task can occupy. This
+ // must take into account potential free memory as well as the amount this pool currently
+ // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
+ // we did not take into account space that could have been freed by evicting cached blocks.
+ val maxPoolSize = computeMaxPoolSize()
+ val maxMemoryPerTask = maxPoolSize / numActiveTasks
+ val minMemoryPerTask = poolSize / (2 * numActiveTasks)
+
+ // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
+ val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)
- if (curMem < poolSize / (2 * numActiveTasks)) {
- // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
- // if we can't give it this much now, wait for other tasks to free up memory
- // (this happens if older tasks allocated lots of memory before N grew)
- if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) {
- memoryForTask(taskAttemptId) += toGrant
- return toGrant
- } else {
- logInfo(
- s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
- lock.wait()
- }
+ // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
+ // if we can't give it this much now, wait for other tasks to free up memory
+ // (this happens if older tasks allocated lots of memory before N grew)
+ if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
+ logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
+ lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0b9f6a9dc0..829f054dba 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -81,22 +81,51 @@ private[spark] class UnifiedMemoryManager private[memory] (
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
- if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
- val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
- // There is not enough free memory in the execution pool, so try to reclaim memory from
- // storage. We can reclaim any free memory from the storage pool. If the storage pool
- // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
- // the memory that storage has borrowed from execution.
- val memoryReclaimableFromStorage =
- math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
- if (memoryReclaimableFromStorage > 0) {
- // Only reclaim as much space as is necessary and available:
- val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
- math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
- onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+
+ /**
+ * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
+ *
+ * When acquiring memory for a task, the execution pool may need to make multiple
+ * attempts. Each attempt must be able to evict storage in case another task jumps in
+ * and caches a large block between the attempts. This is called once per attempt.
+ */
+ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+ if (extraMemoryNeeded > 0) {
+ // There is not enough free memory in the execution pool, so try to reclaim memory from
+ // storage. We can reclaim any free memory from the storage pool. If the storage pool
+ // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
+ // the memory that storage has borrowed from execution.
+ val memoryReclaimableFromStorage =
+ math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
+ if (memoryReclaimableFromStorage > 0) {
+ // Only reclaim as much space as is necessary and available:
+ val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+ math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+ onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+ }
}
}
- onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+
+ /**
+ * The size the execution pool would have after evicting storage memory.
+ *
+ * The execution memory pool divides this quantity among the active tasks evenly to cap
+ * the execution memory allocation for each task. It is important to keep this greater
+ * than the execution pool size, which doesn't take into account potential memory that
+ * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
+ *
+ * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
+ * in execution memory allocation across tasks, Otherwise, a task may occupy more than
+ * its fair share of execution memory, mistakenly thinking that other tasks can acquire
+ * the portion of storage memory that cannot be evicted.
+ */
+ def computeMaxExecutionPoolSize(): Long = {
+ maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+ }
+
+ onHeapExecutionMemoryPool.acquireMemory(
+ numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
+
case MemoryMode.OFF_HEAP =>
// For now, we only support on-heap caching of data, so we do not need to interact with
// the storage pool when allocating off-heap memory. This will change in the future, though.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d4bc3a5c90..9f27eed626 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -92,6 +92,12 @@ private[spark] abstract class Task[T](
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+ // Notify any tasks waiting for execution memory to be freed to wake up and try to
+ // acquire memory again. This makes impossible the scenario where a task sleeps forever
+ // because there are no other tasks left to notify it. Since this is safe to do but may
+ // not be strictly necessary, we should revisit whether we can remove this in the future.
+ val memoryManager = SparkEnv.get.memoryManager
+ memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
TaskContext.unset()
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e21a028b7f..6cc48597d3 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -230,4 +230,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(exception.getMessage.contains("larger heap size"))
}
+ test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {
+ val conf = new SparkConf()
+ .set("spark.memory.fraction", "1")
+ .set("spark.memory.storageFraction", "0")
+ .set("spark.testing.memory", "1000")
+ val mm = UnifiedMemoryManager(conf, numCores = 2)
+ val ms = makeMemoryStore(mm)
+ assert(mm.maxMemory === 1000)
+ // Have two tasks each acquire some execution memory so that the memory pool registers that
+ // there are two active tasks:
+ assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
+ // Fill up all of the remaining memory with storage.
+ assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ assert(mm.storageMemoryUsed === 800)
+ assert(mm.executionMemoryUsed === 200)
+ // A task should still be able to allocate 100 bytes execution memory by evicting blocks
+ assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+ assert(mm.executionMemoryUsed === 300)
+ assert(mm.storageMemoryUsed === 700)
+ assert(evictedBlocks.nonEmpty)
+ }
+
}