aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala338
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/storage/PutResult.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala52
6 files changed, 223 insertions, 273 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b59191b291..dcf359e3c2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -52,6 +52,12 @@ private[spark] class BlockResult(
val readMethod: DataReadMethod.Value,
val bytes: Long)
+// Class for representing return value of doPut()
+private sealed trait DoPutResult
+private case object DoPutSucceeded extends DoPutResult
+private case object DoPutBytesFailed extends DoPutResult
+private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
+
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -432,98 +438,108 @@ private[spark] class BlockManager(
logDebug(s"Block $blockId was not found")
None
case Some(info) =>
- val level = info.level
- logDebug(s"Level for block $blockId is $level")
-
- // Look for the block in memory
- if (level.useMemory) {
- logDebug(s"Getting block $blockId from memory")
- val result = if (asBlockResult) {
- memoryStore.getValues(blockId).map { iter =>
- val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
- new BlockResult(ci, DataReadMethod.Memory, info.size)
- }
- } else {
- memoryStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in memory")
- }
+ doGetLocal(blockId, info, asBlockResult)
+ }
+ }
+
+ /**
+ * Get a local block from the block manager.
+ * Assumes that the caller holds a read lock on the block.
+ */
+ private def doGetLocal(
+ blockId: BlockId,
+ info: BlockInfo,
+ asBlockResult: Boolean): Option[Any] = {
+ val level = info.level
+ logDebug(s"Level for block $blockId is $level")
+
+ // Look for the block in memory
+ if (level.useMemory) {
+ logDebug(s"Getting block $blockId from memory")
+ val result = if (asBlockResult) {
+ memoryStore.getValues(blockId).map { iter =>
+ val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+ new BlockResult(ci, DataReadMethod.Memory, info.size)
}
+ } else {
+ memoryStore.getBytes(blockId)
+ }
+ result match {
+ case Some(values) =>
+ return result
+ case None =>
+ logDebug(s"Block $blockId not found in memory")
+ }
+ }
- // Look for block on disk, potentially storing it back in memory if required
- if (level.useDisk) {
- logDebug(s"Getting block $blockId from disk")
- val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
- case Some(b) => b
- case None =>
- releaseLock(blockId)
- throw new BlockException(
- blockId, s"Block $blockId not found on disk, though it should be")
- }
- assert(0 == bytes.position())
-
- if (!level.useMemory) {
- // If the block shouldn't be stored in memory, we can just return it
- if (asBlockResult) {
- val iter = CompletionIterator[Any, Iterator[Any]](
- dataDeserialize(blockId, bytes), releaseLock(blockId))
- return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
- } else {
- return Some(bytes)
- }
- } else {
- // Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asBlockResult) {
- /* We'll store the bytes in memory if the block's storage level includes
- * "memory serialized", or if it should be cached as objects in memory
- * but we only requested its serialized bytes. */
- memoryStore.putBytes(blockId, bytes.limit, () => {
- // https://issues.apache.org/jira/browse/SPARK-6076
- // If the file size is bigger than the free memory, OOM will happen. So if we cannot
- // put it into MemoryStore, copyForMemory should not be created. That's why this
- // action is put into a `() => ByteBuffer` and created lazily.
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- })
- bytes.rewind()
- }
- if (!asBlockResult) {
- return Some(bytes)
- } else {
- val values = dataDeserialize(blockId, bytes)
- if (level.deserialized) {
- // Cache the values before returning them
- val putResult = memoryStore.putIterator(
- blockId, values, level, returnValues = true, allowPersistToDisk = false)
- // The put may or may not have succeeded, depending on whether there was enough
- // space to unroll the block. Either way, the put here should return an iterator.
- putResult.data match {
- case Left(it) =>
- val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId))
- return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
- case _ =>
- // This only happens if we dropped the values back to disk (which is never)
- throw new SparkException("Memory store did not return an iterator!")
- }
- } else {
- val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId))
- return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ // Look for block on disk, potentially storing it back in memory if required
+ if (level.useDisk) {
+ logDebug(s"Getting block $blockId from disk")
+ val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
+ case Some(b) => b
+ case None =>
+ releaseLock(blockId)
+ throw new BlockException(
+ blockId, s"Block $blockId not found on disk, though it should be")
+ }
+ assert(0 == bytes.position())
+
+ if (!level.useMemory) {
+ // If the block shouldn't be stored in memory, we can just return it
+ if (asBlockResult) {
+ val iter = CompletionIterator[Any, Iterator[Any]](
+ dataDeserialize(blockId, bytes), releaseLock(blockId))
+ return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
+ } else {
+ return Some(bytes)
+ }
+ } else {
+ // Otherwise, we also have to store something in the memory store
+ if (!level.deserialized || !asBlockResult) {
+ /* We'll store the bytes in memory if the block's storage level includes
+ * "memory serialized", or if it should be cached as objects in memory
+ * but we only requested its serialized bytes. */
+ memoryStore.putBytes(blockId, bytes.limit, () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we cannot
+ // put it into MemoryStore, copyForMemory should not be created. That's why this
+ // action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ })
+ bytes.rewind()
+ }
+ if (!asBlockResult) {
+ return Some(bytes)
+ } else {
+ val values = dataDeserialize(blockId, bytes)
+ val valuesToReturn: Iterator[Any] = {
+ if (level.deserialized) {
+ // Cache the values before returning them
+ memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match {
+ case Left(iter) =>
+ // The memory store put() failed, so it returned the iterator back to us:
+ iter
+ case Right(_) =>
+ // The put() succeeded, so we can read the values back:
+ memoryStore.getValues(blockId).get
}
+ } else {
+ values
}
}
- } else {
- // This branch represents a case where the BlockInfoManager contained an entry for
- // the block but the block could not be found in any of the block stores. This case
- // should never occur, but for completeness's sake we address it here.
- logError(
- s"Block $blockId is supposedly stored locally but was not found in any block store")
- releaseLock(blockId)
- None
+ val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId))
+ return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
}
+ }
+ } else {
+ // This branch represents a case where the BlockInfoManager contained an entry for
+ // the block but the block could not be found in any of the block stores. This case
+ // should never occur, but for completeness's sake we address it here.
+ logError(
+ s"Block $blockId is supposedly stored locally but was not found in any block store")
+ releaseLock(blockId)
+ None
}
}
@@ -659,7 +675,7 @@ private[spark] class BlockManager(
makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
// Initially we hold no locks on this block.
doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
- case None =>
+ case DoPutSucceeded =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
val blockResult = get(blockId).getOrElse {
@@ -669,11 +685,13 @@ private[spark] class BlockManager(
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
Left(blockResult)
- case Some(failedPutResult) =>
+ case DoPutIteratorFailed(iter) =>
// The put failed, likely because the data was too large to fit in memory and could not be
// dropped to disk. Therefore, we need to pass the input iterator back to the caller so
// that they can decide what to do with the values (e.g. process them without caching).
- Right(failedPutResult.data.left.get)
+ Right(iter)
+ case DoPutBytesFailed =>
+ throw new SparkException("doPut returned an invalid failure response")
}
}
@@ -687,7 +705,13 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(values != null, "Values is null")
- doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty
+ val result = doPut(
+ blockId,
+ IteratorValues(() => values),
+ level,
+ tellMaster,
+ effectiveStorageLevel)
+ result == DoPutSucceeded
}
/**
@@ -719,7 +743,8 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(bytes != null, "Bytes is null")
- doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty
+ val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
+ result == DoPutSucceeded
}
/**
@@ -734,9 +759,9 @@ private[spark] class BlockManager(
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
- * @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
- * or None if the block already existed or was successfully stored (fully consuming
- * the input data / input iterator).
+ * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or
+ * [[DoPutBytesFailed]] if the put failed and we were storing bytes, or
+ * [[DoPutIteratorFailed]] if the put failed and we were storing an iterator.
*/
private def doPut(
blockId: BlockId,
@@ -744,7 +769,7 @@ private[spark] class BlockManager(
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None,
- keepReadLock: Boolean = false): Option[PutResult] = {
+ keepReadLock: Boolean = false): DoPutResult = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -765,21 +790,12 @@ private[spark] class BlockManager(
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
releaseLock(blockId)
}
- return None
+ return DoPutSucceeded
}
}
val startTimeMs = System.currentTimeMillis
- /* If we're storing values and we need to replicate the data, we'll want access to the values,
- * but because our put will read the whole iterator, there will be no values left. For the
- * case where the put serializes data, we'll remember the bytes, above; but for the case where
- * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
- var valuesAfterPut: Iterator[Any] = null
-
- // Ditto for the bytes after the put
- var bytesAfterPut: ByteBuffer = null
-
// Size of the block in bytes
var size = 0L
@@ -801,43 +817,46 @@ private[spark] class BlockManager(
}
var blockWasSuccessfullyStored = false
- var result: PutResult = null
+ var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
try {
- // returnValues - Whether to return the values put
- // blockStore - The type of storage to put these values into
- val (returnValues, blockStore: BlockStore) = {
- if (putLevel.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- (true, memoryStore)
- } else if (putLevel.useDisk) {
- // Don't get back the bytes from put unless we replicate them
- (putLevel.replication > 1, diskStore)
- } else {
- assert(putLevel == StorageLevel.NONE)
- throw new BlockException(
- blockId, s"Attempted to put block $blockId without specifying storage level!")
+ if (putLevel.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ data match {
+ case IteratorValues(iterator) =>
+ memoryStore.putIterator(blockId, iterator(), putLevel) match {
+ case Right(s) =>
+ size = s
+ case Left(iter) =>
+ iteratorFromFailedMemoryStorePut = Some(iter)
+ }
+ case ByteBufferValues(bytes) =>
+ bytes.rewind()
+ size = bytes.limit()
+ memoryStore.putBytes(blockId, bytes, putLevel)
}
- }
-
- // Actually put the values
- result = data match {
- case IteratorValues(iterator) =>
- blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- blockStore.putBytes(blockId, bytes, putLevel)
- }
- size = result.size
- result.data match {
- case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
- case Right (newBytes) => bytesAfterPut = newBytes
- case _ =>
+ } else if (putLevel.useDisk) {
+ data match {
+ case IteratorValues(iterator) =>
+ diskStore.putIterator(blockId, iterator(), putLevel) match {
+ case Right(s) =>
+ size = s
+ // putIterator() will never return Left (see its return type).
+ }
+ case ByteBufferValues(bytes) =>
+ bytes.rewind()
+ size = bytes.limit()
+ diskStore.putBytes(blockId, bytes, putLevel)
+ }
+ } else {
+ assert(putLevel == StorageLevel.NONE)
+ throw new BlockException(
+ blockId, s"Attempted to put block $blockId without specifying storage level!")
}
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
@@ -868,34 +887,27 @@ private[spark] class BlockManager(
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- // Either we're storing bytes and we asynchronously started replication, or we're storing
- // values and need to serialize and replicate them now:
- if (putLevel.replication > 1) {
- data match {
- case ByteBufferValues(bytes) =>
- if (replicationFuture != null) {
- Await.ready(replicationFuture, Duration.Inf)
- }
- case _ =>
- if (blockWasSuccessfullyStored) {
- val remoteStartTime = System.currentTimeMillis
- // Serialize the block if not already done
- if (bytesAfterPut == null) {
- if (valuesAfterPut == null) {
- throw new SparkException(
- "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
- }
- bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
- }
- replicate(blockId, bytesAfterPut, putLevel)
- logDebug("Put block %s remotely took %s"
- .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+ if (replicationFuture != null) {
+ // Wait for asynchronous replication to finish
+ Await.ready(replicationFuture, Duration.Inf)
+ } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
+ val remoteStartTime = System.currentTimeMillis
+ val bytesToReplicate: ByteBuffer = {
+ doGetLocal(blockId, putBlockInfo, asBlockResult = false)
+ .map(_.asInstanceOf[ByteBuffer])
+ .getOrElse {
+ throw new SparkException(s"Block $blockId was not found even though it was just stored")
}
}
+ try {
+ replicate(blockId, bytesToReplicate, putLevel)
+ } finally {
+ BlockManager.dispose(bytesToReplicate)
+ }
+ logDebug("Put block %s remotely took %s"
+ .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
- BlockManager.dispose(bytesAfterPut)
-
if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
@@ -905,9 +917,11 @@ private[spark] class BlockManager(
}
if (blockWasSuccessfullyStored) {
- None
+ DoPutSucceeded
+ } else if (iteratorFromFailedMemoryStorePut.isDefined) {
+ DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
} else {
- Some(result)
+ DoPutBytesFailed
}
}
@@ -1064,7 +1078,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
- diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
+ diskStore.putIterator(blockId, elements.toIterator, level)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index d3af50d974..b069918b16 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -26,20 +26,18 @@ import org.apache.spark.Logging
*/
private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
- def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
+ def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
/**
- * Put in a block and, possibly, also return its content as either bytes or another Iterator.
- * This is used to efficiently write the values to multiple locations (e.g. for replication).
+ * Attempt to store an iterator of values.
*
- * @return a PutResult that contains the size of the data, as well as the values put if
- * returnValues is true (if not, the result's data field can be null)
+ * @return an iterator of values (in case the put failed), or the estimated size of the stored
+ * values if the put succeeded.
*/
def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel): Either[Iterator[Any], Long]
/**
* Return the size of a block in bytes.
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index db12a4a1b9..e35aa1b068 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -36,7 +36,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
diskManager.getFile(blockId.name).length
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
@@ -54,15 +54,12 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
- PutResult(bytes.limit(), Right(bytes.duplicate()))
}
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
-
+ level: StorageLevel): Right[Iterator[Any], Long] = {
logDebug(s"Attempting to write values for block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
@@ -90,13 +87,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(length), timeTaken))
- if (returnValues) {
- // Return a byte buffer for the contents of the file
- val buffer = getBytes(blockId).get
- PutResult(length, Right(buffer))
- } else {
- PutResult(length, null)
- }
+ Right(length)
}
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
@@ -127,10 +118,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
getBytes(file, 0, file.length)
}
- def getBytes(segment: FileSegment): Option[ByteBuffer] = {
- getBytes(segment.file, segment.offset, segment.length)
- }
-
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 317d73abba..12b70d1807 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -87,16 +87,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
- putIterator(blockId, values, level, returnValues = true)
+ putIterator(blockId, values, level)
} else {
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -106,26 +105,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*
* The caller should guarantee that `size` is correct.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
- val data =
- if (putSuccess) {
- assert(bytes.limit == size)
- Right(bytes.duplicate())
- } else {
- null
- }
- PutResult(size, data)
+ if (putSuccess) {
+ assert(bytes.limit == size)
+ }
}
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+ level: StorageLevel): Either[Iterator[Any], Long] = {
+ putIterator(blockId, values, level, allowPersistToDisk = true)
}
/**
@@ -144,32 +137,30 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- returnValues: Boolean,
- allowPersistToDisk: Boolean): PutResult = {
+ allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
- val res = {
+ val size = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
- PutResult(sizeEstimate, Left(arrayValues.iterator))
+ sizeEstimate
} else {
val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ bytes.limit()
}
}
- PutResult(res.size, res.data)
+ Right(size)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
- val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
- PutResult(res.size, res.data)
+ blockManager.diskStore.putIterator(blockId, iteratorValues, level)
} else {
- PutResult(0, Left(iteratorValues))
+ Left(iteratorValues)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
deleted file mode 100644
index f0eac7594e..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-/**
- * Result of adding a block into a BlockStore. This case class contains a few things:
- * (1) The estimated size of the put,
- * (2) The values put if the caller asked for them to be returned (e.g. for chaining
- * replication), and
- * (3) A list of blocks dropped as a result of this put. This is always empty for DiskStore.
- */
-private[spark] case class PutResult(
- size: Long,
- data: Either[Iterator[_], ByteBuffer],
- droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 89b427049b..cfcbf1745d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1156,14 +1156,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
- val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
+ val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
- assert(result1.size > 0) // unroll was successful
- assert(result2.size > 0)
- assert(result1.data.isLeft) // unroll did not drop this block to disk
- assert(result2.data.isLeft)
+ assert(result1.isRight) // unroll was successful
+ assert(result2.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Re-put these two blocks so block manager knows about them too. Otherwise, block manager
@@ -1174,9 +1172,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
- assert(result3.size > 0)
- assert(result3.data.isLeft)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+ assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1185,9 +1182,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
- assert(result4.size === 0) // unroll was unsuccessful
- assert(result4.data.isLeft)
+ val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+ assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1214,8 +1210,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
- assert(result3.size > 0)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+ assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1229,9 +1225,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll huge block with not enough space. This should fail and drop the new block to disk
// directly in addition to kicking out b2 in the process. Memory store should contain only
// b3, while disk store should contain b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
- assert(result4.size > 0)
- assert(result4.data.isRight) // unroll returned bytes from disk
+ val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+ assert(result4.isRight)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1252,28 +1247,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// All unroll memory used is released because unrollSafely returned an array
- memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b1", smallIterator, memOnly)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b2", smallIterator, memOnly)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because unrollSafely returned an iterator
// that still depends on the underlying vector used in the process
- memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b3", smallIterator, memOnly)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b4", smallIterator, memOnly)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b5", smallIterator, memOnly)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b6", smallIterator, memOnly)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b7", smallIterator, memOnly)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1286,11 +1281,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockId = BlockId("rdd_3_10")
store.blockInfoManager.lockNewBlockForWriting(
blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
- val result = memoryStore.putBytes(blockId, 13000, () => {
+ memoryStore.putBytes(blockId, 13000, () => {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
})
- assert(result.size === 13000)
- assert(result.data === null)
}
test("put a small ByteBuffer to MemoryStore") {
@@ -1298,12 +1291,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
var bytes: ByteBuffer = null
- val result = memoryStore.putBytes(blockId, 10000, () => {
+ memoryStore.putBytes(blockId, 10000, () => {
bytes = ByteBuffer.allocate(10000)
bytes
})
- assert(result.size === 10000)
- assert(result.data === Right(bytes))
+ assert(memoryStore.getSize(blockId) === 10000)
}
test("read-locked blocks cannot be evicted from the MemoryStore") {