aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Budde <budde@amazon.com>2016-02-02 19:35:33 -0800
committerAndrew Or <andrew@databricks.com>2016-02-02 19:35:33 -0800
commitff71261b651a7b289ea2312abd6075da8b838ed9 (patch)
tree8807de2ed8a8dc00b345d852bff46e880b02aef2
parent21112e8a14c042ccef4312079672108a1082a95e (diff)
downloadspark-ff71261b651a7b289ea2312abd6075da8b838ed9.tar.gz
spark-ff71261b651a7b289ea2312abd6075da8b838ed9.tar.bz2
spark-ff71261b651a7b289ea2312abd6075da8b838ed9.zip
[SPARK-13122] Fix race condition in MemoryStore.unrollSafely()
https://issues.apache.org/jira/browse/SPARK-13122 A race condition can occur in MemoryStore's unrollSafely() method if two threads that return the same value for currentTaskAttemptId() execute this method concurrently. This change makes the operation of reading the initial amount of unroll memory used, performing the unroll, and updating the associated memory maps atomic in order to avoid this race condition. Initial proposed fix wraps all of unrollSafely() in a memoryManager.synchronized { } block. A cleaner approach might be introduce a mechanism that synchronizes based on task attempt ID. An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID. Author: Adam Budde <budde@amazon.com> Closes #11012 from budde/master.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala14
1 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 76aaa782b9..024b660ce6 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -255,8 +255,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
- // Previous unroll memory held by this task, for releasing later (only at the very end)
- val previousMemoryReserved = currentUnrollMemoryForThisTask
+ // Keep track of pending unroll memory reserved by this method.
+ var pendingMemoryReserved = 0L
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[Any]
@@ -266,6 +266,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+ } else {
+ pendingMemoryReserved += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
@@ -278,6 +280,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ if (keepUnrolling) {
+ pendingMemoryReserved += amountToRequest
+ }
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
}
@@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
// so `tryToPut` can further transfer it to normal storage memory later.
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
- val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
- unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
+ unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
pendingUnrollMemoryMap(taskAttemptId) =
- pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
+ pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
}
} else {
// Otherwise, if we return an iterator, we can only release the unroll memory when