aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala25
4 files changed, 114 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 9023e1ac01..dbb0ad8d5c 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
*
+ * @param numBytes number of bytes to acquire
+ * @param taskAttemptId the task attempt acquiring memory
+ * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
+ * one parameter (Long) that represents the desired amount of memory by
+ * which this pool should be expanded.
+ * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
+ * at this given moment. This is not a field because the max pool
+ * size is variable in certain cases. For instance, in unified
+ * memory management, the execution pool can be expanded by evicting
+ * cached blocks, thereby shrinking the storage pool.
+ *
* @return the number of bytes granted to the task.
*/
- def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
+ private[memory] def acquireMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
+ computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
+ // TODO: clean up this clunky method signature
+
// Add this task to the taskMemory map just so we can keep an accurate count of the number
// of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
if (!memoryForTask.contains(taskAttemptId)) {
@@ -91,25 +108,31 @@ private[memory] class ExecutionMemoryPool(
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
- // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
- // don't let it be negative
- val maxToGrant =
- math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+ // In every iteration of this loop, we should first try to reclaim any borrowed execution
+ // space from storage. This is necessary because of the potential race condition where new
+ // storage blocks may steal the free execution memory that this task was waiting for.
+ maybeGrowPool(numBytes - memoryFree)
+
+ // Maximum size the pool would have after potentially growing the pool.
+ // This is used to compute the upper bound of how much memory each task can occupy. This
+ // must take into account potential free memory as well as the amount this pool currently
+ // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
+ // we did not take into account space that could have been freed by evicting cached blocks.
+ val maxPoolSize = computeMaxPoolSize()
+ val maxMemoryPerTask = maxPoolSize / numActiveTasks
+ val minMemoryPerTask = poolSize / (2 * numActiveTasks)
+
+ // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
+ val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
// Only give it as much memory as is free, which might be none if it reached 1 / numTasks
val toGrant = math.min(maxToGrant, memoryFree)
- if (curMem < poolSize / (2 * numActiveTasks)) {
- // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
- // if we can't give it this much now, wait for other tasks to free up memory
- // (this happens if older tasks allocated lots of memory before N grew)
- if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) {
- memoryForTask(taskAttemptId) += toGrant
- return toGrant
- } else {
- logInfo(
- s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
- lock.wait()
- }
+ // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
+ // if we can't give it this much now, wait for other tasks to free up memory
+ // (this happens if older tasks allocated lots of memory before N grew)
+ if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
+ logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
+ lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0b9f6a9dc0..829f054dba 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -81,22 +81,51 @@ private[spark] class UnifiedMemoryManager private[memory] (
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
- if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
- val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
- // There is not enough free memory in the execution pool, so try to reclaim memory from
- // storage. We can reclaim any free memory from the storage pool. If the storage pool
- // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
- // the memory that storage has borrowed from execution.
- val memoryReclaimableFromStorage =
- math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
- if (memoryReclaimableFromStorage > 0) {
- // Only reclaim as much space as is necessary and available:
- val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
- math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
- onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+
+ /**
+ * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
+ *
+ * When acquiring memory for a task, the execution pool may need to make multiple
+ * attempts. Each attempt must be able to evict storage in case another task jumps in
+ * and caches a large block between the attempts. This is called once per attempt.
+ */
+ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+ if (extraMemoryNeeded > 0) {
+ // There is not enough free memory in the execution pool, so try to reclaim memory from
+ // storage. We can reclaim any free memory from the storage pool. If the storage pool
+ // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
+ // the memory that storage has borrowed from execution.
+ val memoryReclaimableFromStorage =
+ math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
+ if (memoryReclaimableFromStorage > 0) {
+ // Only reclaim as much space as is necessary and available:
+ val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+ math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+ onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+ }
}
}
- onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+
+ /**
+ * The size the execution pool would have after evicting storage memory.
+ *
+ * The execution memory pool divides this quantity among the active tasks evenly to cap
+ * the execution memory allocation for each task. It is important to keep this greater
+ * than the execution pool size, which doesn't take into account potential memory that
+ * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
+ *
+ * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
+ * in execution memory allocation across tasks, Otherwise, a task may occupy more than
+ * its fair share of execution memory, mistakenly thinking that other tasks can acquire
+ * the portion of storage memory that cannot be evicted.
+ */
+ def computeMaxExecutionPoolSize(): Long = {
+ maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+ }
+
+ onHeapExecutionMemoryPool.acquireMemory(
+ numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
+
case MemoryMode.OFF_HEAP =>
// For now, we only support on-heap caching of data, so we do not need to interact with
// the storage pool when allocating off-heap memory. This will change in the future, though.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d4bc3a5c90..9f27eed626 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -92,6 +92,12 @@ private[spark] abstract class Task[T](
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+ // Notify any tasks waiting for execution memory to be freed to wake up and try to
+ // acquire memory again. This makes impossible the scenario where a task sleeps forever
+ // because there are no other tasks left to notify it. Since this is safe to do but may
+ // not be strictly necessary, we should revisit whether we can remove this in the future.
+ val memoryManager = SparkEnv.get.memoryManager
+ memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
TaskContext.unset()
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e21a028b7f..6cc48597d3 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -230,4 +230,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(exception.getMessage.contains("larger heap size"))
}
+ test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") {
+ val conf = new SparkConf()
+ .set("spark.memory.fraction", "1")
+ .set("spark.memory.storageFraction", "0")
+ .set("spark.testing.memory", "1000")
+ val mm = UnifiedMemoryManager(conf, numCores = 2)
+ val ms = makeMemoryStore(mm)
+ assert(mm.maxMemory === 1000)
+ // Have two tasks each acquire some execution memory so that the memory pool registers that
+ // there are two active tasks:
+ assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+ assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
+ // Fill up all of the remaining memory with storage.
+ assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+ assertEvictBlocksToFreeSpaceNotCalled(ms)
+ assert(mm.storageMemoryUsed === 800)
+ assert(mm.executionMemoryUsed === 200)
+ // A task should still be able to allocate 100 bytes execution memory by evicting blocks
+ assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+ assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+ assert(mm.executionMemoryUsed === 300)
+ assert(mm.storageMemoryUsed === 700)
+ assert(evictedBlocks.nonEmpty)
+ }
+
}