aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhushan[胡珊] <hushan@xiaomi.com>2015-03-02 16:53:54 -0800
committerAndrew Or <andrew@databricks.com>2015-03-02 16:53:54 -0800
commite3a88d1104ebdb858f0509f56d7bb536037e5f63 (patch)
tree849c18c12fc41e7284432c6cc89bee5e16061807
parent258d154c9f1afdd52dce19f03d81683ee34effac (diff)
downloadspark-e3a88d1104ebdb858f0509f56d7bb536037e5f63.tar.gz
spark-e3a88d1104ebdb858f0509f56d7bb536037e5f63.tar.bz2
spark-e3a88d1104ebdb858f0509f56d7bb536037e5f63.zip
[SPARK-4777][CORE] Some block memory after unrollSafely not count into used memory(memoryStore.entrys or unrollMemory)
Some memory not count into memory used by memoryStore or unrollMemory. Thread A after unrollsafely memory, it will release 40MB unrollMemory(40MB will used by other threads). then ThreadA wait get accountingLock to tryToPut blockA(30MB). before Thread A get accountingLock, blockA memory size is not counting into unrollMemory or memoryStore.currentMemory. IIUC, freeMemory should minus that block memory So, put this release memory into pending, and release it in tryToPut before ensureSpace Author: hushan[胡珊] <hushan@xiaomi.com> Closes #3629 from suyanNone/unroll-memory and squashes the following commits: 809cc41 [hushan[胡珊]] Refine 407b2c9 [hushan[胡珊]] Refine according comments 39960d0 [hushan[胡珊]] Refine comments 0fd0213 [hushan[胡珊]] add comments 0fc2bec [hushan[胡珊]] Release pending unroll memory after put block in memoryStore 3a3f2c8 [hushan[胡珊]] Refine blockManagerSuite unroll test 3323c45 [hushan[胡珊]] Refine getOrElse f664317 [hushan[胡珊]] Make sure not add pending in every releaseUnrollMemory call 08b32ba [hushan[胡珊]] Pending unroll memory for this block untill tryToPut
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala52
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
2 files changed, 47 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 7f4b6e8bd3..1be860aea6 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -46,6 +46,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `accountingLock`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+ // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
+ // Pending unroll memory refers to the intermediate memory occupied by a thread
+ // after the unroll but before the actual putting of the block in the cache.
+ // This chunk of memory is expected to be released *as soon as* we finish
+ // caching the corresponding block as opposed to until after the task finishes.
+ // This is only used if a block is successfully unrolled in its entirety in
+ // memory (SPARK-4777).
+ private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
/**
* The amount of space ensured for unrolling values in memory, shared across all cores.
@@ -283,12 +291,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
} finally {
- // If we return an array, the values returned do not depend on the underlying vector and
- // we can immediately free up space for other threads. Otherwise, if we return an iterator,
- // we release the memory claimed by this thread later on when the task finishes.
+ // If we return an array, the values returned will later be cached in `tryToPut`.
+ // In this case, we should release the memory after we cache the block there.
+ // Otherwise, if we return an iterator, we release the memory reserved here
+ // later when the task finishes.
if (keepUnrolling) {
- val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
- releaseUnrollMemoryForThisThread(amountToRelease)
+ accountingLock.synchronized {
+ val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
+ releaseUnrollMemoryForThisThread(amountToRelease)
+ reservePendingUnrollMemoryForThisThread(amountToRelease)
+ }
}
}
}
@@ -353,6 +365,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
+ // Release the unroll memory used because we no longer need the underlying Array
+ releasePendingUnrollMemoryForThisThread()
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}
@@ -381,7 +395,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
// Take into account the amount of memory currently occupied by unrolling blocks
- val actualFreeMemory = freeMemory - currentUnrollMemory
+ // and minus the pending unroll memory for that block on current thread.
+ val threadId = Thread.currentThread().getId
+ val actualFreeMemory = freeMemory - currentUnrollMemory +
+ pendingUnrollMemoryMap.getOrElse(threadId, 0L)
if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
@@ -469,10 +486,31 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
/**
+ * Reserve the unroll memory of current unroll successful block used by this thread
+ * until actually put the block into memory entry.
+ */
+ def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = {
+ val threadId = Thread.currentThread().getId
+ accountingLock.synchronized {
+ pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory
+ }
+ }
+
+ /**
+ * Release pending unroll memory of current unroll successful block used by this thread
+ */
+ def releasePendingUnrollMemoryForThisThread(): Unit = {
+ val threadId = Thread.currentThread().getId
+ accountingLock.synchronized {
+ pendingUnrollMemoryMap.remove(threadId)
+ }
+ }
+
+ /**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
def currentUnrollMemory: Long = accountingLock.synchronized {
- unrollMemoryMap.values.sum
+ unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
}
/**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index ffe6f03914..3fdbe99b5d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1064,6 +1064,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ memoryStore.releasePendingUnrollMemoryForThisThread()
// Unroll with not enough space. This should succeed after kicking out someBlock1.
store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
@@ -1074,6 +1075,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(droppedBlocks.size === 1)
assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
droppedBlocks.clear()
+ memoryStore.releasePendingUnrollMemoryForThisThread()
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.