aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala14
1 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 76aaa782b9..024b660ce6 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -255,8 +255,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
- // Previous unroll memory held by this task, for releasing later (only at the very end)
- val previousMemoryReserved = currentUnrollMemoryForThisTask
+ // Keep track of pending unroll memory reserved by this method.
+ var pendingMemoryReserved = 0L
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[Any]
@@ -266,6 +266,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+ } else {
+ pendingMemoryReserved += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
@@ -278,6 +280,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ if (keepUnrolling) {
+ pendingMemoryReserved += amountToRequest
+ }
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
@@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
// so `tryToPut` can further transfer it to normal storage memory later.
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
- val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
- unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
+ unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
pendingUnrollMemoryMap(taskAttemptId) =
- pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
+ pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
}
} else {
// Otherwise, if we return an iterator, we can only release the unroll memory when