aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
2 files changed, 47 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 7f4b6e8bd3..1be860aea6 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -46,6 +46,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `accountingLock`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+ // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
+ // Pending unroll memory refers to the intermediate memory occupied by a thread
+ // after the unroll but before the actual putting of the block in the cache.
+ // This chunk of memory is expected to be released *as soon as* we finish
+ // caching the corresponding block as opposed to until after the task finishes.
+ // This is only used if a block is successfully unrolled in its entirety in
+ // memory (SPARK-4777).
+ private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
/**
* The amount of space ensured for unrolling values in memory, shared across all cores.
@@ -283,12 +291,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
} finally {
- // If we return an array, the values returned do not depend on the underlying vector and
- // we can immediately free up space for other threads. Otherwise, if we return an iterator,
- // we release the memory claimed by this thread later on when the task finishes.
+ // If we return an array, the values returned will later be cached in `tryToPut`.
+ // In this case, we should release the memory after we cache the block there.
+ // Otherwise, if we return an iterator, we release the memory reserved here
+ // later when the task finishes.
if (keepUnrolling) {
- val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
- releaseUnrollMemoryForThisThread(amountToRelease)
+ accountingLock.synchronized {
+ val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
+ releaseUnrollMemoryForThisThread(amountToRelease)
+ reservePendingUnrollMemoryForThisThread(amountToRelease)
+ }
}
}
}
@@ -353,6 +365,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
+ // Release the unroll memory used because we no longer need the underlying Array
+ releasePendingUnrollMemoryForThisThread()
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}
@@ -381,7 +395,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
// Take into account the amount of memory currently occupied by unrolling blocks
- val actualFreeMemory = freeMemory - currentUnrollMemory
+ // and minus the pending unroll memory for that block on current thread.
+ val threadId = Thread.currentThread().getId
+ val actualFreeMemory = freeMemory - currentUnrollMemory +
+ pendingUnrollMemoryMap.getOrElse(threadId, 0L)
if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
@@ -469,10 +486,31 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
/**
+ * Reserve the unroll memory of current unroll successful block used by this thread
+ * until actually put the block into memory entry.
+ */
+ def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = {
+ val threadId = Thread.currentThread().getId
+ accountingLock.synchronized {
+ pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory
+ }
+ }
+
+ /**
+ * Release pending unroll memory of current unroll successful block used by this thread
+ */
+ def releasePendingUnrollMemoryForThisThread(): Unit = {
+ val threadId = Thread.currentThread().getId
+ accountingLock.synchronized {
+ pendingUnrollMemoryMap.remove(threadId)
+ }
+ }
+
+ /**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
def currentUnrollMemory: Long = accountingLock.synchronized {
- unrollMemoryMap.values.sum
+ unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
}
/**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index ffe6f03914..3fdbe99b5d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1064,6 +1064,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ memoryStore.releasePendingUnrollMemoryForThisThread()
// Unroll with not enough space. This should succeed after kicking out someBlock1.
store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
@@ -1074,6 +1075,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(droppedBlocks.size === 1)
assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
droppedBlocks.clear()
+ memoryStore.releasePendingUnrollMemoryForThisThread()
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.