From 38529d8f2350feb1f143aab0be336050c0f887f2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 14:26:39 -0700 Subject: [SPARK-10907][SPARK-6157] Remove pendingUnrollMemory from MemoryStore This patch refactors the MemoryStore to remove the concept of `pendingUnrollMemory`. It also fixes fixes SPARK-6157: "Unrolling with MEMORY_AND_DISK should always release memory". Key changes: - Inline `MemoryStore.tryToPut` at its three call sites in the `MemoryStore`. - Inline `Memory.unrollSafely` at its only call site (in `MemoryStore.putIterator`). - Inline `MemoryManager.acquireStorageMemory` at its call sites. - Simplify the code as a result of this inlining (some parameters have fixed values after inlining, so lots of branches can be removed). - Remove the `pendingUnrollMemory` map by returning the amount of unrollMemory allocated when returning an iterator after a failed `putIterator` call. - Change `putIterator` to return an instance of `PartiallyUnrolledIterator`, a special iterator subclass which will automatically free the unroll memory of its partially-unrolled elements when the iterator is consumed. To handle cases where the iterator is not consumed (e.g. when a MEMORY_ONLY put fails), `PartiallyUnrolledIterator` exposes a `close()` method which may be called to discard the unrolled values and free their memory. Author: Josh Rosen Closes #11613 from JoshRosen/cleanup-unroll-memory. --- .../org/apache/spark/storage/BlockManager.scala | 24 +- .../apache/spark/storage/memory/MemoryStore.scala | 361 ++++++++++----------- .../apache/spark/storage/BlockManagerSuite.scala | 77 ++--- 3 files changed, 224 insertions(+), 238 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b503410a73..d21df4b95b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -669,8 +669,15 @@ private[spark] class BlockManager( level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(values != null, "Values is null") - // If doPut() didn't hand work back to us, then block already existed or was successfully stored - doPutIterator(blockId, () => values, level, tellMaster).isEmpty + doPutIterator(blockId, () => values, level, tellMaster) match { + case None => + true + case Some(iter) => + // Caller doesn't care about the iterator values, so we can close the iterator here + // to free resources earlier + iter.close() + false + } } /** @@ -745,7 +752,14 @@ private[spark] class BlockManager( // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { val values = dataDeserialize(blockId, bytes.duplicate()) - memoryStore.putIterator(blockId, values, level).isRight + memoryStore.putIterator(blockId, values, level) match { + case Right(_) => true + case Left(iter) => + // If putting deserialized values in memory failed, we will put the bytes directly to + // disk, so we don't need this iterator and can close it to free resources earlier. + iter.close() + false + } } else { memoryStore.putBytes(blockId, size, () => bytes) } @@ -857,10 +871,10 @@ private[spark] class BlockManager( iterator: () => Iterator[Any], level: StorageLevel, tellMaster: Boolean = true, - keepReadLock: Boolean = false): Option[Iterator[Any]] = { + keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = { doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo => val startTimeMs = System.currentTimeMillis - var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None + var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None // Size of the block in bytes var size = 0L if (level.useMemory) { diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index a80b2357ff..02d44dc732 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkConf, TaskContext} import org.apache.spark.memory.MemoryManager import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel} -import org.apache.spark.util.{SizeEstimator, Utils} +import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) @@ -49,14 +49,6 @@ private[spark] class MemoryStore( // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` private val unrollMemoryMap = mutable.HashMap[Long, Long]() - // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. - // Pending unroll memory refers to the intermediate memory occupied by a task - // after the unroll but before the actual putting of the block in the cache. - // This chunk of memory is expected to be released *as soon as* we finish - // caching the corresponding block as opposed to until after the task finishes. - // This is only used if a block is successfully unrolled in its entirety in - // memory (SPARK-4777). - private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = @@ -100,48 +92,151 @@ private[spark] class MemoryStore( */ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - // Work on a duplicate - since the original input might be used elsewhere. - lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] - val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) - if (putSuccess) { + if (memoryManager.acquireStorageMemory(blockId, size)) { + // We acquired enough memory for the block, so go ahead and put it + // Work on a duplicate - since the original input might be used elsewhere. + val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] assert(bytes.limit == size) + val entry = new MemoryEntry(bytes, size, deserialized = false) + entries.synchronized { + entries.put(blockId, entry) + } + logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( + blockId, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) + true + } else { + false } - putSuccess } /** * Attempt to put the given block in memory store. * - * @return the estimated size of the stored data if the put() succeeded, or an iterator - * in case the put() failed (the returned iterator lets callers fall back to the disk - * store if desired). + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated the estimated size of the stored data. In case of + * failure, return an iterator containing the values of the block. The returned iterator + * will be backed by the combination of the partially-unrolled block and the remaining + * elements of the original input iterator. The caller must either fully consume this + * iterator or call `close()` on it in order to free the storage memory consumed by the + * partially-unrolled block. */ private[storage] def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel): Either[Iterator[Any], Long] = { + level: StorageLevel): Either[PartiallyUnrolledIterator, Long] = { + require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - val unrolledValues = unrollSafely(blockId, values) - unrolledValues match { - case Left(arrayValues) => - // Values are fully unrolled in memory, so store them as an array - if (level.deserialized) { - val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) - if (tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)) { - Right(sizeEstimate) - } else { - Left(arrayValues.toIterator) + + // Number of elements unrolled so far + var elementsUnrolled = 0 + // Whether there is still enough memory for us to continue unrolling this block + var keepUnrolling = true + // Initial per-task memory to request for unrolling blocks (bytes). + val initialMemoryThreshold = unrollMemoryThreshold + // How often to check whether we need to request more memory + val memoryCheckPeriod = 16 + // Memory currently reserved by this task for this particular unrolling operation + var memoryThreshold = initialMemoryThreshold + // Memory to request as a multiple of current vector size + val memoryGrowthFactor = 1.5 + // Keep track of unroll memory used by this particular block / putIterator() operation + var unrollMemoryUsedByThisBlock = 0L + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[Any] + + // Request enough memory to begin unrolling + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + + if (!keepUnrolling) { + logWarning(s"Failed to reserve initial memory threshold of " + + s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") + } else { + unrollMemoryUsedByThisBlock += initialMemoryThreshold + } + + // Unroll this block safely, checking whether we have exceeded our threshold periodically + while (values.hasNext && keepUnrolling) { + vector += values.next() + if (elementsUnrolled % memoryCheckPeriod == 0) { + // If our vector's size has exceeded the threshold, request more memory + val currentSize = vector.estimateSize() + if (currentSize >= memoryThreshold) { + val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } - } else { - val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) - if (tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)) { - Right(bytes.limit()) - } else { - Left(arrayValues.toIterator) + // New threshold is currentSize * memoryGrowthFactor + memoryThreshold += amountToRequest + } + } + elementsUnrolled += 1 + } + + if (keepUnrolling) { + // We successfully unrolled the entirety of this block + val arrayValues = vector.toArray + vector = null + val entry = if (level.deserialized) { + new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true) + } else { + val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) + new MemoryEntry(bytes, bytes.limit, deserialized = false) + } + val size = entry.size + def transferUnrollToStorage(amount: Long): Unit = { + // Synchronize so that transfer is atomic + memoryManager.synchronized { + releaseUnrollMemoryForThisTask(amount) + val success = memoryManager.acquireStorageMemory(blockId, amount) + assert(success, "transferring unroll memory to storage memory failed") + } + } + // Acquire storage memory if necessary to store this block in memory. + val enoughStorageMemory = { + if (unrollMemoryUsedByThisBlock <= size) { + val acquiredExtra = + memoryManager.acquireStorageMemory(blockId, size - unrollMemoryUsedByThisBlock) + if (acquiredExtra) { + transferUnrollToStorage(unrollMemoryUsedByThisBlock) } + acquiredExtra + } else { // unrollMemoryUsedByThisBlock > size + // If this task attempt already owns more unroll memory than is necessary to store the + // block, then release the extra memory that will not be used. + val excessUnrollMemory = unrollMemoryUsedByThisBlock - size + releaseUnrollMemoryForThisTask(excessUnrollMemory) + transferUnrollToStorage(size) + true } - case Right(iteratorValues) => - Left(iteratorValues) + } + if (enoughStorageMemory) { + entries.synchronized { + entries.put(blockId, entry) + } + val bytesOrValues = if (level.deserialized) "values" else "bytes" + logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( + blockId, bytesOrValues, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) + Right(size) + } else { + assert(currentUnrollMemoryForThisTask >= currentUnrollMemoryForThisTask, + "released too much unroll memory") + Left(new PartiallyUnrolledIterator( + this, + unrollMemoryUsedByThisBlock, + unrolled = arrayValues.toIterator, + rest = Iterator.empty)) + } + } else { + // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, vector.estimateSize()) + Left(new PartiallyUnrolledIterator( + this, unrollMemoryUsedByThisBlock, unrolled = vector.iterator, rest = values)) } } @@ -188,102 +283,10 @@ private[spark] class MemoryStore( entries.clear() } unrollMemoryMap.clear() - pendingUnrollMemoryMap.clear() memoryManager.releaseAllStorageMemory() logInfo("MemoryStore cleared") } - /** - * Unroll the given block in memory safely. - * - * The safety of this operation refers to avoiding potential OOM exceptions caused by - * unrolling the entirety of the block in memory at once. This is achieved by periodically - * checking whether the memory restrictions for unrolling blocks are still satisfied, - * stopping immediately if not. This check is a safeguard against the scenario in which - * there is not enough free memory to accommodate the entirety of a single block. - * - * This method returns either an array with the contents of the entire block or an iterator - * containing the values of the block (if the array would have exceeded available memory). - */ - def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = { - - // Number of elements unrolled so far - var elementsUnrolled = 0 - // Whether there is still enough memory for us to continue unrolling this block - var keepUnrolling = true - // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing. - val initialMemoryThreshold = unrollMemoryThreshold - // How often to check whether we need to request more memory - val memoryCheckPeriod = 16 - // Memory currently reserved by this task for this particular unrolling operation - var memoryThreshold = initialMemoryThreshold - // Memory to request as a multiple of current vector size - val memoryGrowthFactor = 1.5 - // Keep track of pending unroll memory reserved by this method. - var pendingMemoryReserved = 0L - // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[Any] - - // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) - - if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + - s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") - } else { - pendingMemoryReserved += initialMemoryThreshold - } - - // Unroll this block safely, checking whether we have exceeded our threshold periodically - try { - while (values.hasNext && keepUnrolling) { - vector += values.next() - if (elementsUnrolled % memoryCheckPeriod == 0) { - // If our vector's size has exceeded the threshold, request more memory - val currentSize = vector.estimateSize() - if (currentSize >= memoryThreshold) { - val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) - if (keepUnrolling) { - pendingMemoryReserved += amountToRequest - } - // New threshold is currentSize * memoryGrowthFactor - memoryThreshold += amountToRequest - } - } - elementsUnrolled += 1 - } - - if (keepUnrolling) { - // We successfully unrolled the entirety of this block - Left(vector.toArray) - } else { - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, vector.estimateSize()) - Right(vector.iterator ++ values) - } - - } finally { - // If we return an array, the values returned here will be cached in `tryToPut` later. - // In this case, we should release the memory only after we cache the block there. - if (keepUnrolling) { - val taskAttemptId = currentTaskAttemptId() - memoryManager.synchronized { - // Since we continue to hold onto the array until we actually cache it, we cannot - // release the unroll memory yet. Instead, we transfer it to pending unroll memory - // so `tryToPut` can further transfer it to normal storage memory later. - // TODO: we can probably express this without pending unroll memory (SPARK-10907) - unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved - } - } else { - // Otherwise, if we return an iterator, we can only release the unroll memory when - // the task finishes since we don't know when the iterator will be consumed. - } - } - } - /** * Return the RDD ID that a given block ID is from, or None if it is not an RDD block. */ @@ -291,48 +294,6 @@ private[spark] class MemoryStore( blockId.asRDDId.map(_.rddId) } - /** - * Try to put in a set of values, if we can free up enough space. The value should either be - * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size - * must also be passed by the caller. - * - * @return whether put was successful. - */ - private def tryToPut( - blockId: BlockId, - value: () => Any, - size: Long, - deserialized: Boolean): Boolean = { - val acquiredEnoughStorageMemory = { - // Synchronize on memoryManager so that the pending unroll memory isn't stolen by another - // task. - memoryManager.synchronized { - // Note: if we have previously unrolled this block successfully, then pending unroll - // memory should be non-zero. This is the amount that we already reserved during the - // unrolling process. In this case, we can just reuse this space to cache our block. - // The synchronization on `memoryManager` here guarantees that the release and acquire - // happen atomically. This relies on the assumption that all memory acquisitions are - // synchronized on the same lock. - releasePendingUnrollMemoryForThisTask() - memoryManager.acquireStorageMemory(blockId, size) - } - } - - if (acquiredEnoughStorageMemory) { - // We acquired enough memory for the block, so go ahead and put it - val entry = new MemoryEntry(value(), size, deserialized) - entries.synchronized { - entries.put(blockId, entry) - } - val valuesOrBytes = if (deserialized) "values" else "bytes" - logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) - true - } else { - false - } - } - /** * Try to evict blocks to free up a given amount of space to store a particular block. * Can fail if either the block is bigger than our memory or it would require replacing @@ -456,30 +417,11 @@ private[spark] class MemoryStore( } } - /** - * Release pending unroll memory of current unroll successful block used by this task - */ - def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { - val taskAttemptId = currentTaskAttemptId() - memoryManager.synchronized { - if (pendingUnrollMemoryMap.contains(taskAttemptId)) { - val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId)) - if (memoryToRelease > 0) { - pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease - if (pendingUnrollMemoryMap(taskAttemptId) == 0) { - pendingUnrollMemoryMap.remove(taskAttemptId) - } - memoryManager.releaseUnrollMemory(memoryToRelease) - } - } - } - } - /** * Return the amount of memory currently occupied for unrolling blocks across all tasks. */ def currentUnrollMemory: Long = memoryManager.synchronized { - unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum + unrollMemoryMap.values.sum } /** @@ -520,3 +462,42 @@ private[spark] class MemoryStore( logMemoryUsage() } } + +/** + * The result of a failed [[MemoryStore.putIterator()]] call. + * + * @param memoryStore the memoryStore, used for freeing memory. + * @param unrollMemory the amount of unroll memory used by the values in `unrolled`. + * @param unrolled an iterator for the partially-unrolled values. + * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]]. + */ +private[storage] class PartiallyUnrolledIterator( + memoryStore: MemoryStore, + unrollMemory: Long, + unrolled: Iterator[Any], + rest: Iterator[Any]) + extends Iterator[Any] { + + private[this] var unrolledIteratorIsConsumed: Boolean = false + private[this] var iter: Iterator[Any] = { + val completionIterator = CompletionIterator[Any, Iterator[Any]](unrolled, { + unrolledIteratorIsConsumed = true + memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + }) + completionIterator ++ rest + } + + override def hasNext: Boolean = iter.hasNext + override def next(): Any = iter.next() + + /** + * Called to dispose of this iterator and free its memory. + */ + def close(): Unit = { + if (!unrolledIteratorIsConsumed) { + memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + unrolledIteratorIsConsumed = true + } + iter = null + } +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index dc4be14677..2e0c0596a7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1065,28 +1065,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) } - /** - * Verify the result of MemoryStore#unrollSafely is as expected. - */ - private def verifyUnroll( - expected: Iterator[Any], - result: Either[Array[Any], Iterator[Any]], - shouldBeArray: Boolean): Unit = { - val actual: Iterator[Any] = result match { - case Left(arr: Array[Any]) => - assert(shouldBeArray, "expected iterator from unroll!") - arr.iterator - case Right(it: Iterator[Any]) => - assert(!shouldBeArray, "expected array from unroll!") - it - case _ => - fail("unroll returned neither an iterator nor an array...") - } - expected.zip(actual).foreach { case (e, a) => - assert(e === a, "unroll did not return original values!") - } - } - test("safely unroll blocks") { store = makeBlockManager(12000) val smallList = List.fill(40)(new Array[Byte](100)) @@ -1094,30 +1072,41 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val memoryStore = store.memoryStore assert(memoryStore.currentUnrollMemoryForThisTask === 0) - // Unroll with all the space in the world. This should succeed and return an array. - var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) - verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) + // Unroll with all the space in the world. This should succeed. + var putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - memoryStore.releasePendingUnrollMemoryForThisTask() + smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => + assert(e === a, "getValues() did not return original values!") + } + assert(memoryStore.remove("unroll")) // Unroll with not enough space. This should succeed after kicking out someBlock1. - store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) - store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) - verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) + assert(store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)) + assert(store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)) + putResult = memoryStore.putIterator("unroll", smallList.iterator, StorageLevel.MEMORY_ONLY) + assert(putResult.isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) assert(memoryStore.contains("someBlock2")) assert(!memoryStore.contains("someBlock1")) - memoryStore.releasePendingUnrollMemoryForThisTask() + smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) => + assert(e === a, "getValues() did not return original values!") + } + assert(memoryStore.remove("unroll")) // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. - store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator) - verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) + assert(store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)) + putResult = memoryStore.putIterator("unroll", bigList.iterator, StorageLevel.MEMORY_ONLY) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator assert(!memoryStore.contains("someBlock2")) + assert(putResult.isLeft) + bigList.iterator.zip(putResult.left.get).foreach { case (e, a) => + assert(e === a, "putIterator() did not return original values!") + } + // The unroll memory was freed once the iterator returned by putIterator() was fully traversed. + assert(memoryStore.currentUnrollMemoryForThisTask === 0) } test("safely unroll blocks through putIterator") { @@ -1208,6 +1197,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator + result4.left.get.close() + assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory } test("multiple unrolls by the same thread") { @@ -1218,29 +1209,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemoryForThisTask === 0) - // All unroll memory used is released because unrollSafely returned an array - memoryStore.putIterator("b1", smallIterator, memOnly) + // All unroll memory used is released because putIterator did not return an iterator + assert(memoryStore.putIterator("b1", smallIterator, memOnly).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - memoryStore.putIterator("b2", smallIterator, memOnly) + assert(memoryStore.putIterator("b2", smallIterator, memOnly).isRight) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - // Unroll memory is not released because unrollSafely returned an iterator + // Unroll memory is not released because putIterator returned an iterator // that still depends on the underlying vector used in the process - memoryStore.putIterator("b3", smallIterator, memOnly) + assert(memoryStore.putIterator("b3", smallIterator, memOnly).isLeft) val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB3 > 0) // The unroll memory owned by this thread builds on top of its value after the previous unrolls - memoryStore.putIterator("b4", smallIterator, memOnly) + assert(memoryStore.putIterator("b4", smallIterator, memOnly).isLeft) val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) // ... but only to a certain extent (until we run out of free space to grant new unroll memory) - memoryStore.putIterator("b5", smallIterator, memOnly) + assert(memoryStore.putIterator("b5", smallIterator, memOnly).isLeft) val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask - memoryStore.putIterator("b6", smallIterator, memOnly) + assert(memoryStore.putIterator("b6", smallIterator, memOnly).isLeft) val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask - memoryStore.putIterator("b7", smallIterator, memOnly) + assert(memoryStore.putIterator("b7", smallIterator, memOnly).isLeft) val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) -- cgit v1.2.3