aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-14 14:26:39 -0700
committerAndrew Or <andrew@databricks.com>2016-03-14 14:26:39 -0700
commit38529d8f2350feb1f143aab0be336050c0f887f2 (patch)
treee5d24f4b426432b7b9cdc7085685597c7b5921b8 /core
parenta48296f4fe513b63041f1a26231cfe152b69657f (diff)
downloadspark-38529d8f2350feb1f143aab0be336050c0f887f2.tar.gz
spark-38529d8f2350feb1f143aab0be336050c0f887f2.tar.bz2
spark-38529d8f2350feb1f143aab0be336050c0f887f2.zip
[SPARK-10907][SPARK-6157] Remove pendingUnrollMemory from MemoryStore
This patch refactors the MemoryStore to remove the concept of `pendingUnrollMemory`. It also fixes fixes SPARK-6157: "Unrolling with MEMORY_AND_DISK should always release memory". Key changes: - Inline `MemoryStore.tryToPut` at its three call sites in the `MemoryStore`. - Inline `Memory.unrollSafely` at its only call site (in `MemoryStore.putIterator`). - Inline `MemoryManager.acquireStorageMemory` at its call sites. - Simplify the code as a result of this inlining (some parameters have fixed values after inlining, so lots of branches can be removed). - Remove the `pendingUnrollMemory` map by returning the amount of unrollMemory allocated when returning an iterator after a failed `putIterator` call. - Change `putIterator` to return an instance of `PartiallyUnrolledIterator`, a special iterator subclass which will automatically free the unroll memory of its partially-unrolled elements when the iterator is consumed. To handle cases where the iterator is not consumed (e.g. when a MEMORY_ONLY put fails), `PartiallyUnrolledIterator` exposes a `close()` method which may be called to discard the unrolled values and free their memory. Author: Josh Rosen <joshrosen@databricks.com> Closes #11613 from JoshRosen/cleanup-unroll-memory.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala361
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala77
3 files changed, 224 insertions, 238 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b503410a73..d21df4b95b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -669,8 +669,15 @@ private[spark] class BlockManager(
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(values != null, "Values is null")
- // If doPut() didn't hand work back to us, then block already existed or was successfully stored
- doPutIterator(blockId, () => values, level, tellMaster).isEmpty
+ doPutIterator(blockId, () => values, level, tellMaster) match {
+ case None =>
+ true
+ case Some(iter) =>
+ // Caller doesn't care about the iterator values, so we can close the iterator here
+ // to free resources earlier
+ iter.close()
+ false
+ }
}
/**
@@ -745,7 +752,14 @@ private[spark] class BlockManager(
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
val values = dataDeserialize(blockId, bytes.duplicate())
- memoryStore.putIterator(blockId, values, level).isRight
+ memoryStore.putIterator(blockId, values, level) match {
+ case Right(_) => true
+ case Left(iter) =>
+ // If putting deserialized values in memory failed, we will put the bytes directly to
+ // disk, so we don't need this iterator and can close it to free resources earlier.
+ iter.close()
+ false
+ }
} else {
memoryStore.putBytes(blockId, size, () => bytes)
}
@@ -857,10 +871,10 @@ private[spark] class BlockManager(
iterator: () => Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
- keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+ keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = {
doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
val startTimeMs = System.currentTimeMillis
- var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+ var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None
// Size of the block in bytes
var size = 0L
if (level.useMemory) {
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index a80b2357ff..02d44dc732 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf, TaskContext}
import org.apache.spark.memory.MemoryManager
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
-import org.apache.spark.util.{SizeEstimator, Utils}
+import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
@@ -49,14 +49,6 @@ private[spark] class MemoryStore(
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
- // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
- // Pending unroll memory refers to the intermediate memory occupied by a task
- // after the unroll but before the actual putting of the block in the cache.
- // This chunk of memory is expected to be released *as soon as* we finish
- // caching the corresponding block as opposed to until after the task finishes.
- // This is only used if a block is successfully unrolled in its entirety in
- // memory (SPARK-4777).
- private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
// Initial memory to request before unrolling any block
private val unrollMemoryThreshold: Long =
@@ -100,48 +92,151 @@ private[spark] class MemoryStore(
*/
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- // Work on a duplicate - since the original input might be used elsewhere.
- lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
- val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
- if (putSuccess) {
+ if (memoryManager.acquireStorageMemory(blockId, size)) {
+ // We acquired enough memory for the block, so go ahead and put it
+ // Work on a duplicate - since the original input might be used elsewhere.
+ val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
assert(bytes.limit == size)
+ val entry = new MemoryEntry(bytes, size, deserialized = false)
+ entries.synchronized {
+ entries.put(blockId, entry)
+ }
+ logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
+ blockId, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+ true
+ } else {
+ false
}
- putSuccess
}
/**
* Attempt to put the given block in memory store.
*
- * @return the estimated size of the stored data if the put() succeeded, or an iterator
- * in case the put() failed (the returned iterator lets callers fall back to the disk
- * store if desired).
+ * It's possible that the iterator is too large to materialize and store in memory. To avoid
+ * OOM exceptions, this method will gradually unroll the iterator while periodically checking
+ * whether there is enough free memory. If the block is successfully materialized, then the
+ * temporary unroll memory used during the materialization is "transferred" to storage memory,
+ * so we won't acquire more memory than is actually needed to store the block.
+ *
+ * @return in case of success, the estimated the estimated size of the stored data. In case of
+ * failure, return an iterator containing the values of the block. The returned iterator
+ * will be backed by the combination of the partially-unrolled block and the remaining
+ * elements of the original input iterator. The caller must either fully consume this
+ * iterator or call `close()` on it in order to free the storage memory consumed by the
+ * partially-unrolled block.
*/
private[storage] def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel): Either[Iterator[Any], Long] = {
+ level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = {
+
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- val unrolledValues = unrollSafely(blockId, values)
- unrolledValues match {
- case Left(arrayValues) =>
- // Values are fully unrolled in memory, so store them as an array
- if (level.deserialized) {
- val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
- if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) {
- Right(sizeEstimate)
- } else {
- Left(arrayValues.toIterator)
+
+ // Number of elements unrolled so far
+ var elementsUnrolled = 0
+ // Whether there is still enough memory for us to continue unrolling this block
+ var keepUnrolling = true
+ // Initial per-task memory to request for unrolling blocks (bytes).
+ val initialMemoryThreshold = unrollMemoryThreshold
+ // How often to check whether we need to request more memory
+ val memoryCheckPeriod = 16
+ // Memory currently reserved by this task for this particular unrolling operation
+ var memoryThreshold = initialMemoryThreshold
+ // Memory to request as a multiple of current vector size
+ val memoryGrowthFactor = 1.5
+ // Keep track of unroll memory used by this particular block / putIterator() operation
+ var unrollMemoryUsedByThisBlock = 0L
+ // Underlying vector for unrolling the block
+ var vector = new SizeTrackingVector[Any]
+
+ // Request enough memory to begin unrolling
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+
+ if (!keepUnrolling) {
+ logWarning(s"Failed to reserve initial memory threshold of " +
+ s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+ } else {
+ unrollMemoryUsedByThisBlock += initialMemoryThreshold
+ }
+
+ // Unroll this block safely, checking whether we have exceeded our threshold periodically
+ while (values.hasNext && keepUnrolling) {
+ vector += values.next()
+ if (elementsUnrolled % memoryCheckPeriod == 0) {
+ // If our vector's size has exceeded the threshold, request more memory
+ val currentSize = vector.estimateSize()
+ if (currentSize >= memoryThreshold) {
+ val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ if (keepUnrolling) {
+ unrollMemoryUsedByThisBlock += amountToRequest
}
- } else {
- val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
- if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) {
- Right(bytes.limit())
- } else {
- Left(arrayValues.toIterator)
+ // New threshold is currentSize * memoryGrowthFactor
+ memoryThreshold += amountToRequest
+ }
+ }
+ elementsUnrolled += 1
+ }
+
+ if (keepUnrolling) {
+ // We successfully unrolled the entirety of this block
+ val arrayValues = vector.toArray
+ vector = null
+ val entry = if (level.deserialized) {
+ new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true)
+ } else {
+ val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
+ new MemoryEntry(bytes, bytes.limit, deserialized = false)
+ }
+ val size = entry.size
+ def transferUnrollToStorage(amount: Long): Unit = {
+ // Synchronize so that transfer is atomic
+ memoryManager.synchronized {
+ releaseUnrollMemoryForThisTask(amount)
+ val success = memoryManager.acquireStorageMemory(blockId, amount)
+ assert(success, "transferring unroll memory to storage memory failed")
+ }
+ }
+ // Acquire storage memory if necessary to store this block in memory.
+ val enoughStorageMemory = {
+ if (unrollMemoryUsedByThisBlock <= size) {
+ val acquiredExtra =
+ memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock)
+ if (acquiredExtra) {
+ transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
+ acquiredExtra
+ } else { // unrollMemoryUsedByThisBlock > size
+ // If this task attempt already owns more unroll memory than is necessary to store the
+ // block, then release the extra memory that will not be used.
+ val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
+ releaseUnrollMemoryForThisTask(excessUnrollMemory)
+ transferUnrollToStorage(size)
+ true
}
- case Right(iteratorValues) =>
- Left(iteratorValues)
+ }
+ if (enoughStorageMemory) {
+ entries.synchronized {
+ entries.put(blockId, entry)
+ }
+ val bytesOrValues = if (level.deserialized) "values" else "bytes"
+ logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
+ blockId, bytesOrValues, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
+ Right(size)
+ } else {
+ assert(currentUnrollMemoryForThisTask >= currentUnrollMemoryForThisTask,
+ "released too much unroll memory")
+ Left(new PartiallyUnrolledIterator(
+ this,
+ unrollMemoryUsedByThisBlock,
+ unrolled = arrayValues.toIterator,
+ rest = Iterator.empty))
+ }
+ } else {
+ // We ran out of space while unrolling the values for this block
+ logUnrollFailureMessage(blockId, vector.estimateSize())
+ Left(new PartiallyUnrolledIterator(
+ this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values))
}
}
@@ -188,103 +283,11 @@ private[spark] class MemoryStore(
entries.clear()
}
unrollMemoryMap.clear()
- pendingUnrollMemoryMap.clear()
memoryManager.releaseAllStorageMemory()
logInfo("MemoryStore cleared")
}
/**
- * Unroll the given block in memory safely.
- *
- * The safety of this operation refers to avoiding potential OOM exceptions caused by
- * unrolling the entirety of the block in memory at once. This is achieved by periodically
- * checking whether the memory restrictions for unrolling blocks are still satisfied,
- * stopping immediately if not. This check is a safeguard against the scenario in which
- * there is not enough free memory to accommodate the entirety of a single block.
- *
- * This method returns either an array with the contents of the entire block or an iterator
- * containing the values of the block (if the array would have exceeded available memory).
- */
- def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = {
-
- // Number of elements unrolled so far
- var elementsUnrolled = 0
- // Whether there is still enough memory for us to continue unrolling this block
- var keepUnrolling = true
- // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
- val initialMemoryThreshold = unrollMemoryThreshold
- // How often to check whether we need to request more memory
- val memoryCheckPeriod = 16
- // Memory currently reserved by this task for this particular unrolling operation
- var memoryThreshold = initialMemoryThreshold
- // Memory to request as a multiple of current vector size
- val memoryGrowthFactor = 1.5
- // Keep track of pending unroll memory reserved by this method.
- var pendingMemoryReserved = 0L
- // Underlying vector for unrolling the block
- var vector = new SizeTrackingVector[Any]
-
- // Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
-
- if (!keepUnrolling) {
- logWarning(s"Failed to reserve initial memory threshold of " +
- s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
- } else {
- pendingMemoryReserved += initialMemoryThreshold
- }
-
- // Unroll this block safely, checking whether we have exceeded our threshold periodically
- try {
- while (values.hasNext && keepUnrolling) {
- vector += values.next()
- if (elementsUnrolled % memoryCheckPeriod == 0) {
- // If our vector's size has exceeded the threshold, request more memory
- val currentSize = vector.estimateSize()
- if (currentSize >= memoryThreshold) {
- val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
- if (keepUnrolling) {
- pendingMemoryReserved += amountToRequest
- }
- // New threshold is currentSize * memoryGrowthFactor
- memoryThreshold += amountToRequest
- }
- }
- elementsUnrolled += 1
- }
-
- if (keepUnrolling) {
- // We successfully unrolled the entirety of this block
- Left(vector.toArray)
- } else {
- // We ran out of space while unrolling the values for this block
- logUnrollFailureMessage(blockId, vector.estimateSize())
- Right(vector.iterator ++ values)
- }
-
- } finally {
- // If we return an array, the values returned here will be cached in `tryToPut` later.
- // In this case, we should release the memory only after we cache the block there.
- if (keepUnrolling) {
- val taskAttemptId = currentTaskAttemptId()
- memoryManager.synchronized {
- // Since we continue to hold onto the array until we actually cache it, we cannot
- // release the unroll memory yet. Instead, we transfer it to pending unroll memory
- // so `tryToPut` can further transfer it to normal storage memory later.
- // TODO: we can probably express this without pending unroll memory (SPARK-10907)
- unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
- pendingUnrollMemoryMap(taskAttemptId) =
- pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
- }
- } else {
- // Otherwise, if we return an iterator, we can only release the unroll memory when
- // the task finishes since we don't know when the iterator will be consumed.
- }
- }
- }
-
- /**
* Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
*/
private def getRddId(blockId: BlockId): Option[Int] = {
@@ -292,48 +295,6 @@ private[spark] class MemoryStore(
}
/**
- * Try to put in a set of values, if we can free up enough space. The value should either be
- * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
- * must also be passed by the caller.
- *
- * @return whether put was successful.
- */
- private def tryToPut(
- blockId: BlockId,
- value: () => Any,
- size: Long,
- deserialized: Boolean): Boolean = {
- val acquiredEnoughStorageMemory = {
- // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another
- // task.
- memoryManager.synchronized {
- // Note: if we have previously unrolled this block successfully, then pending unroll
- // memory should be non-zero. This is the amount that we already reserved during the
- // unrolling process. In this case, we can just reuse this space to cache our block.
- // The synchronization on `memoryManager` here guarantees that the release and acquire
- // happen atomically. This relies on the assumption that all memory acquisitions are
- // synchronized on the same lock.
- releasePendingUnrollMemoryForThisTask()
- memoryManager.acquireStorageMemory(blockId, size)
- }
- }
-
- if (acquiredEnoughStorageMemory) {
- // We acquired enough memory for the block, so go ahead and put it
- val entry = new MemoryEntry(value(), size, deserialized)
- entries.synchronized {
- entries.put(blockId, entry)
- }
- val valuesOrBytes = if (deserialized) "values" else "bytes"
- logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
- blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
- true
- } else {
- false
- }
- }
-
- /**
* Try to evict blocks to free up a given amount of space to store a particular block.
* Can fail if either the block is bigger than our memory or it would require replacing
* another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
@@ -457,29 +418,10 @@ private[spark] class MemoryStore(
}
/**
- * Release pending unroll memory of current unroll successful block used by this task
- */
- def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
- val taskAttemptId = currentTaskAttemptId()
- memoryManager.synchronized {
- if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
- val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
- if (memoryToRelease > 0) {
- pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
- if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
- pendingUnrollMemoryMap.remove(taskAttemptId)
- }
- memoryManager.releaseUnrollMemory(memoryToRelease)
- }
- }
- }
- }
-
- /**
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
*/
def currentUnrollMemory: Long = memoryManager.synchronized {
- unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
+ unrollMemoryMap.values.sum
}
/**
@@ -520,3 +462,42 @@ private[spark] class MemoryStore(
logMemoryUsage()
}
}
+
+/**
+ * The result of a failed [[MemoryStore.putIterator()]] call.
+ *
+ * @param memoryStore the memoryStore, used for freeing memory.
+ * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
+ * @param unrolled an iterator for the partially-unrolled values.
+ * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]].
+ */
+private[storage] class PartiallyUnrolledIterator(
+ memoryStore: MemoryStore,
+ unrollMemory: Long,
+ unrolled: Iterator[Any],
+ rest: Iterator[Any])
+ extends Iterator[Any] {
+
+ private[this] var unrolledIteratorIsConsumed: Boolean = false
+ private[this] var iter: Iterator[Any] = {
+ val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, {
+ unrolledIteratorIsConsumed = true
+ memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ })
+ completionIterator ++ rest
+ }
+
+ override def hasNext: Boolean = iter.hasNext
+ override def next(): Any = iter.next()
+
+ /**
+ * Called to dispose of this iterator and free its memory.
+ */
+ def close(): Unit = {
+ if (!unrolledIteratorIsConsumed) {
+ memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ unrolledIteratorIsConsumed = true
+ }
+ iter = null
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index dc4be14677..2e0c0596a7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1065,28 +1065,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
- /**
- * Verify the result of MemoryStore#unrollSafely is as expected.
- */
- private def verifyUnroll(
- expected: Iterator[Any],
- result: Either[Array[Any], Iterator[Any]],
- shouldBeArray: Boolean): Unit = {
- val actual: Iterator[Any] = result match {
- case Left(arr: Array[Any]) =>
- assert(shouldBeArray, "expected iterator from unroll!")
- arr.iterator
- case Right(it: Iterator[Any]) =>
- assert(!shouldBeArray, "expected array from unroll!")
- it
- case _ =>
- fail("unroll returned neither an iterator nor an array...")
- }
- expected.zip(actual).foreach { case (e, a) =>
- assert(e === a, "unroll did not return original values!")
- }
- }
-
test("safely unroll blocks") {
store = makeBlockManager(12000)
val smallList = List.fill(40)(new Array[Byte](100))
@@ -1094,30 +1072,41 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val memoryStore = store.memoryStore
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- // Unroll with all the space in the world. This should succeed and return an array.
- var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
- verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+ // Unroll with all the space in the world. This should succeed.
+ var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- memoryStore.releasePendingUnrollMemoryForThisTask()
+ smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
+ assert(e === a, "getValues() did not return original values!")
+ }
+ assert(memoryStore.remove("unroll"))
// Unroll with not enough space. This should succeed after kicking out someBlock1.
- store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
- store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
- unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
- verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+ assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY))
+ assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY))
+ putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ assert(putResult.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
assert(memoryStore.contains("someBlock2"))
assert(!memoryStore.contains("someBlock1"))
- memoryStore.releasePendingUnrollMemoryForThisTask()
+ smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
+ assert(e === a, "getValues() did not return original values!")
+ }
+ assert(memoryStore.remove("unroll"))
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
- store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
- unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
- verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
+ assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY))
+ putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
assert(!memoryStore.contains("someBlock2"))
+ assert(putResult.isLeft)
+ bigList.iterator.zip(putResult.left.get).foreach { case (e, a) =>
+ assert(e === a, "putIterator() did not return original values!")
+ }
+ // The unroll memory was freed once the iterator returned by putIterator() was fully traversed.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
}
test("safely unroll blocks through putIterator") {
@@ -1208,6 +1197,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.contains("b3"))
assert(!memoryStore.contains("b4"))
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+ result4.left.get.close()
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
}
test("multiple unrolls by the same thread") {
@@ -1218,29 +1209,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- // All unroll memory used is released because unrollSafely returned an array
- memoryStore.putIterator("b1", smallIterator, memOnly)
+ // All unroll memory used is released because putIterator did not return an iterator
+ assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- memoryStore.putIterator("b2", smallIterator, memOnly)
+ assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- // Unroll memory is not released because unrollSafely returned an iterator
+ // Unroll memory is not released because putIterator returned an iterator
// that still depends on the underlying vector used in the process
- memoryStore.putIterator("b3", smallIterator, memOnly)
+ assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- memoryStore.putIterator("b4", smallIterator, memOnly)
+ assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- memoryStore.putIterator("b5", smallIterator, memOnly)
+ assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b6", smallIterator, memOnly)
+ assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b7", smallIterator, memOnly)
+ assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)