aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala24
1 files changed, 19 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b503410a73..d21df4b95b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -669,8 +669,15 @@ private[spark] class BlockManager(
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(values != null, "Values is null")
- // If doPut() didn't hand work back to us, then block already existed or was successfully stored
- doPutIterator(blockId, () => values, level, tellMaster).isEmpty
+ doPutIterator(blockId, () => values, level, tellMaster) match {
+ case None =>
+ true
+ case Some(iter) =>
+ // Caller doesn't care about the iterator values, so we can close the iterator here
+ // to free resources earlier
+ iter.close()
+ false
+ }
}
/**
@@ -745,7 +752,14 @@ private[spark] class BlockManager(
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
val values = dataDeserialize(blockId, bytes.duplicate())
- memoryStore.putIterator(blockId, values, level).isRight
+ memoryStore.putIterator(blockId, values, level) match {
+ case Right(_) => true
+ case Left(iter) =>
+ // If putting deserialized values in memory failed, we will put the bytes directly to
+ // disk, so we don't need this iterator and can close it to free resources earlier.
+ iter.close()
+ false
+ }
} else {
memoryStore.putBytes(blockId, size, () => bytes)
}
@@ -857,10 +871,10 @@ private[spark] class BlockManager(
iterator: () => Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
- keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+ keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator] = {
doPut(blockId, level, tellMaster = tellMaster, keepReadLock = keepReadLock) { putBlockInfo =>
val startTimeMs = System.currentTimeMillis
- var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+ var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator] = None
// Size of the block in bytes
var size = 0L
if (level.useMemory) {