aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-07 21:50:01 -0800
committerAndrew Or <andrew@databricks.com>2016-03-07 21:50:01 -0800
commite52e597db48d069b98c1d404b221d3365f38fbb8 (patch)
tree4726833e40fca0e9aa9c265ea772afe5c81bdc7e /core
parent017cdf2be67776978a940609d610afea79856b17 (diff)
downloadspark-e52e597db48d069b98c1d404b221d3365f38fbb8.tar.gz
spark-e52e597db48d069b98c1d404b221d3365f38fbb8.tar.bz2
spark-e52e597db48d069b98c1d404b221d3365f38fbb8.zip
[SPARK-13659] Refactor BlockStore put*() APIs to remove returnValues
In preparation for larger refactoring, this patch removes the confusing `returnValues` option from the BlockStore put() APIs: returning the value is only useful in one place (caching) and in other situations, such as block replication, it's simpler to put() and then get(). As part of this change, I needed to refactor `BlockManager.doPut()`'s block replication code. I also changed `doPut()` to access the memory and disk stores directly rather than calling them through the BlockStore interface; this is in anticipation of a followup patch to remove the BlockStore interface so that the disk store can expose a binary-data-oriented API which is not concerned with Java objects or serialization. These changes should be covered by the existing storage unit tests. The best way to review this patch is probably to look at the individual commits, all of which are small and have useful descriptions to guide the review. /cc davies for review. Author: Josh Rosen <joshrosen@databricks.com> Closes #11502 from JoshRosen/remove-returnvalues.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala338
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/storage/PutResult.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala52
6 files changed, 223 insertions, 273 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b59191b291..dcf359e3c2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -52,6 +52,12 @@ private[spark] class BlockResult(
val readMethod: DataReadMethod.Value,
val bytes: Long)
+// Class for representing return value of doPut()
+private sealed trait DoPutResult
+private case object DoPutSucceeded extends DoPutResult
+private case object DoPutBytesFailed extends DoPutResult
+private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
+
/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
@@ -432,98 +438,108 @@ private[spark] class BlockManager(
logDebug(s"Block $blockId was not found")
None
case Some(info) =>
- val level = info.level
- logDebug(s"Level for block $blockId is $level")
-
- // Look for the block in memory
- if (level.useMemory) {
- logDebug(s"Getting block $blockId from memory")
- val result = if (asBlockResult) {
- memoryStore.getValues(blockId).map { iter =>
- val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
- new BlockResult(ci, DataReadMethod.Memory, info.size)
- }
- } else {
- memoryStore.getBytes(blockId)
- }
- result match {
- case Some(values) =>
- return result
- case None =>
- logDebug(s"Block $blockId not found in memory")
- }
+ doGetLocal(blockId, info, asBlockResult)
+ }
+ }
+
+ /**
+ * Get a local block from the block manager.
+ * Assumes that the caller holds a read lock on the block.
+ */
+ private def doGetLocal(
+ blockId: BlockId,
+ info: BlockInfo,
+ asBlockResult: Boolean): Option[Any] = {
+ val level = info.level
+ logDebug(s"Level for block $blockId is $level")
+
+ // Look for the block in memory
+ if (level.useMemory) {
+ logDebug(s"Getting block $blockId from memory")
+ val result = if (asBlockResult) {
+ memoryStore.getValues(blockId).map { iter =>
+ val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
+ new BlockResult(ci, DataReadMethod.Memory, info.size)
}
+ } else {
+ memoryStore.getBytes(blockId)
+ }
+ result match {
+ case Some(values) =>
+ return result
+ case None =>
+ logDebug(s"Block $blockId not found in memory")
+ }
+ }
- // Look for block on disk, potentially storing it back in memory if required
- if (level.useDisk) {
- logDebug(s"Getting block $blockId from disk")
- val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
- case Some(b) => b
- case None =>
- releaseLock(blockId)
- throw new BlockException(
- blockId, s"Block $blockId not found on disk, though it should be")
- }
- assert(0 == bytes.position())
-
- if (!level.useMemory) {
- // If the block shouldn't be stored in memory, we can just return it
- if (asBlockResult) {
- val iter = CompletionIterator[Any, Iterator[Any]](
- dataDeserialize(blockId, bytes), releaseLock(blockId))
- return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
- } else {
- return Some(bytes)
- }
- } else {
- // Otherwise, we also have to store something in the memory store
- if (!level.deserialized || !asBlockResult) {
- /* We'll store the bytes in memory if the block's storage level includes
- * "memory serialized", or if it should be cached as objects in memory
- * but we only requested its serialized bytes. */
- memoryStore.putBytes(blockId, bytes.limit, () => {
- // https://issues.apache.org/jira/browse/SPARK-6076
- // If the file size is bigger than the free memory, OOM will happen. So if we cannot
- // put it into MemoryStore, copyForMemory should not be created. That's why this
- // action is put into a `() => ByteBuffer` and created lazily.
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- })
- bytes.rewind()
- }
- if (!asBlockResult) {
- return Some(bytes)
- } else {
- val values = dataDeserialize(blockId, bytes)
- if (level.deserialized) {
- // Cache the values before returning them
- val putResult = memoryStore.putIterator(
- blockId, values, level, returnValues = true, allowPersistToDisk = false)
- // The put may or may not have succeeded, depending on whether there was enough
- // space to unroll the block. Either way, the put here should return an iterator.
- putResult.data match {
- case Left(it) =>
- val ci = CompletionIterator[Any, Iterator[Any]](it, releaseLock(blockId))
- return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
- case _ =>
- // This only happens if we dropped the values back to disk (which is never)
- throw new SparkException("Memory store did not return an iterator!")
- }
- } else {
- val ci = CompletionIterator[Any, Iterator[Any]](values, releaseLock(blockId))
- return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ // Look for block on disk, potentially storing it back in memory if required
+ if (level.useDisk) {
+ logDebug(s"Getting block $blockId from disk")
+ val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
+ case Some(b) => b
+ case None =>
+ releaseLock(blockId)
+ throw new BlockException(
+ blockId, s"Block $blockId not found on disk, though it should be")
+ }
+ assert(0 == bytes.position())
+
+ if (!level.useMemory) {
+ // If the block shouldn't be stored in memory, we can just return it
+ if (asBlockResult) {
+ val iter = CompletionIterator[Any, Iterator[Any]](
+ dataDeserialize(blockId, bytes), releaseLock(blockId))
+ return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
+ } else {
+ return Some(bytes)
+ }
+ } else {
+ // Otherwise, we also have to store something in the memory store
+ if (!level.deserialized || !asBlockResult) {
+ /* We'll store the bytes in memory if the block's storage level includes
+ * "memory serialized", or if it should be cached as objects in memory
+ * but we only requested its serialized bytes. */
+ memoryStore.putBytes(blockId, bytes.limit, () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we cannot
+ // put it into MemoryStore, copyForMemory should not be created. That's why this
+ // action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ })
+ bytes.rewind()
+ }
+ if (!asBlockResult) {
+ return Some(bytes)
+ } else {
+ val values = dataDeserialize(blockId, bytes)
+ val valuesToReturn: Iterator[Any] = {
+ if (level.deserialized) {
+ // Cache the values before returning them
+ memoryStore.putIterator(blockId, values, level, allowPersistToDisk = false) match {
+ case Left(iter) =>
+ // The memory store put() failed, so it returned the iterator back to us:
+ iter
+ case Right(_) =>
+ // The put() succeeded, so we can read the values back:
+ memoryStore.getValues(blockId).get
}
+ } else {
+ values
}
}
- } else {
- // This branch represents a case where the BlockInfoManager contained an entry for
- // the block but the block could not be found in any of the block stores. This case
- // should never occur, but for completeness's sake we address it here.
- logError(
- s"Block $blockId is supposedly stored locally but was not found in any block store")
- releaseLock(blockId)
- None
+ val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, releaseLock(blockId))
+ return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
}
+ }
+ } else {
+ // This branch represents a case where the BlockInfoManager contained an entry for
+ // the block but the block could not be found in any of the block stores. This case
+ // should never occur, but for completeness's sake we address it here.
+ logError(
+ s"Block $blockId is supposedly stored locally but was not found in any block store")
+ releaseLock(blockId)
+ None
}
}
@@ -659,7 +675,7 @@ private[spark] class BlockManager(
makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
// Initially we hold no locks on this block.
doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
- case None =>
+ case DoPutSucceeded =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
val blockResult = get(blockId).getOrElse {
@@ -669,11 +685,13 @@ private[spark] class BlockManager(
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
Left(blockResult)
- case Some(failedPutResult) =>
+ case DoPutIteratorFailed(iter) =>
// The put failed, likely because the data was too large to fit in memory and could not be
// dropped to disk. Therefore, we need to pass the input iterator back to the caller so
// that they can decide what to do with the values (e.g. process them without caching).
- Right(failedPutResult.data.left.get)
+ Right(iter)
+ case DoPutBytesFailed =>
+ throw new SparkException("doPut returned an invalid failure response")
}
}
@@ -687,7 +705,13 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(values != null, "Values is null")
- doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty
+ val result = doPut(
+ blockId,
+ IteratorValues(() => values),
+ level,
+ tellMaster,
+ effectiveStorageLevel)
+ result == DoPutSucceeded
}
/**
@@ -719,7 +743,8 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(bytes != null, "Bytes is null")
- doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty
+ val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
+ result == DoPutSucceeded
}
/**
@@ -734,9 +759,9 @@ private[spark] class BlockManager(
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
- * @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
- * or None if the block already existed or was successfully stored (fully consuming
- * the input data / input iterator).
+ * @return [[DoPutSucceeded]] if the block was already present or if the put succeeded, or
+ * [[DoPutBytesFailed]] if the put failed and we were storing bytes, or
+ * [[DoPutIteratorFailed]] if the put failed and we were storing an iterator.
*/
private def doPut(
blockId: BlockId,
@@ -744,7 +769,7 @@ private[spark] class BlockManager(
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None,
- keepReadLock: Boolean = false): Option[PutResult] = {
+ keepReadLock: Boolean = false): DoPutResult = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -765,21 +790,12 @@ private[spark] class BlockManager(
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
releaseLock(blockId)
}
- return None
+ return DoPutSucceeded
}
}
val startTimeMs = System.currentTimeMillis
- /* If we're storing values and we need to replicate the data, we'll want access to the values,
- * but because our put will read the whole iterator, there will be no values left. For the
- * case where the put serializes data, we'll remember the bytes, above; but for the case where
- * it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
- var valuesAfterPut: Iterator[Any] = null
-
- // Ditto for the bytes after the put
- var bytesAfterPut: ByteBuffer = null
-
// Size of the block in bytes
var size = 0L
@@ -801,43 +817,46 @@ private[spark] class BlockManager(
}
var blockWasSuccessfullyStored = false
- var result: PutResult = null
+ var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
try {
- // returnValues - Whether to return the values put
- // blockStore - The type of storage to put these values into
- val (returnValues, blockStore: BlockStore) = {
- if (putLevel.useMemory) {
- // Put it in memory first, even if it also has useDisk set to true;
- // We will drop it to disk later if the memory store can't hold it.
- (true, memoryStore)
- } else if (putLevel.useDisk) {
- // Don't get back the bytes from put unless we replicate them
- (putLevel.replication > 1, diskStore)
- } else {
- assert(putLevel == StorageLevel.NONE)
- throw new BlockException(
- blockId, s"Attempted to put block $blockId without specifying storage level!")
+ if (putLevel.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ data match {
+ case IteratorValues(iterator) =>
+ memoryStore.putIterator(blockId, iterator(), putLevel) match {
+ case Right(s) =>
+ size = s
+ case Left(iter) =>
+ iteratorFromFailedMemoryStorePut = Some(iter)
+ }
+ case ByteBufferValues(bytes) =>
+ bytes.rewind()
+ size = bytes.limit()
+ memoryStore.putBytes(blockId, bytes, putLevel)
}
- }
-
- // Actually put the values
- result = data match {
- case IteratorValues(iterator) =>
- blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- blockStore.putBytes(blockId, bytes, putLevel)
- }
- size = result.size
- result.data match {
- case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
- case Right (newBytes) => bytesAfterPut = newBytes
- case _ =>
+ } else if (putLevel.useDisk) {
+ data match {
+ case IteratorValues(iterator) =>
+ diskStore.putIterator(blockId, iterator(), putLevel) match {
+ case Right(s) =>
+ size = s
+ // putIterator() will never return Left (see its return type).
+ }
+ case ByteBufferValues(bytes) =>
+ bytes.rewind()
+ size = bytes.limit()
+ diskStore.putBytes(blockId, bytes, putLevel)
+ }
+ } else {
+ assert(putLevel == StorageLevel.NONE)
+ throw new BlockException(
+ blockId, s"Attempted to put block $blockId without specifying storage level!")
}
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
@@ -868,34 +887,27 @@ private[spark] class BlockManager(
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- // Either we're storing bytes and we asynchronously started replication, or we're storing
- // values and need to serialize and replicate them now:
- if (putLevel.replication > 1) {
- data match {
- case ByteBufferValues(bytes) =>
- if (replicationFuture != null) {
- Await.ready(replicationFuture, Duration.Inf)
- }
- case _ =>
- if (blockWasSuccessfullyStored) {
- val remoteStartTime = System.currentTimeMillis
- // Serialize the block if not already done
- if (bytesAfterPut == null) {
- if (valuesAfterPut == null) {
- throw new SparkException(
- "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
- }
- bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
- }
- replicate(blockId, bytesAfterPut, putLevel)
- logDebug("Put block %s remotely took %s"
- .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+ if (replicationFuture != null) {
+ // Wait for asynchronous replication to finish
+ Await.ready(replicationFuture, Duration.Inf)
+ } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
+ val remoteStartTime = System.currentTimeMillis
+ val bytesToReplicate: ByteBuffer = {
+ doGetLocal(blockId, putBlockInfo, asBlockResult = false)
+ .map(_.asInstanceOf[ByteBuffer])
+ .getOrElse {
+ throw new SparkException(s"Block $blockId was not found even though it was just stored")
}
}
+ try {
+ replicate(blockId, bytesToReplicate, putLevel)
+ } finally {
+ BlockManager.dispose(bytesToReplicate)
+ }
+ logDebug("Put block %s remotely took %s"
+ .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
- BlockManager.dispose(bytesAfterPut)
-
if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
@@ -905,9 +917,11 @@ private[spark] class BlockManager(
}
if (blockWasSuccessfullyStored) {
- None
+ DoPutSucceeded
+ } else if (iteratorFromFailedMemoryStorePut.isDefined) {
+ DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
} else {
- Some(result)
+ DoPutBytesFailed
}
}
@@ -1064,7 +1078,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
- diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
+ diskStore.putIterator(blockId, elements.toIterator, level)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index d3af50d974..b069918b16 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -26,20 +26,18 @@ import org.apache.spark.Logging
*/
private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
- def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
+ def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
/**
- * Put in a block and, possibly, also return its content as either bytes or another Iterator.
- * This is used to efficiently write the values to multiple locations (e.g. for replication).
+ * Attempt to store an iterator of values.
*
- * @return a PutResult that contains the size of the data, as well as the values put if
- * returnValues is true (if not, the result's data field can be null)
+ * @return an iterator of values (in case the put failed), or the estimated size of the stored
+ * values if the put succeeded.
*/
def putIterator(
- blockId: BlockId,
- values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel): Either[Iterator[Any], Long]
/**
* Return the size of a block in bytes.
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index db12a4a1b9..e35aa1b068 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -36,7 +36,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
diskManager.getFile(blockId.name).length
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
@@ -54,15 +54,12 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
- PutResult(bytes.limit(), Right(bytes.duplicate()))
}
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
-
+ level: StorageLevel): Right[Iterator[Any], Long] = {
logDebug(s"Attempting to write values for block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
@@ -90,13 +87,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(length), timeTaken))
- if (returnValues) {
- // Return a byte buffer for the contents of the file
- val buffer = getBytes(blockId).get
- PutResult(length, Right(buffer))
- } else {
- PutResult(length, null)
- }
+ Right(length)
}
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
@@ -127,10 +118,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
getBytes(file, 0, file.length)
}
- def getBytes(segment: FileSegment): Option[ByteBuffer] = {
- getBytes(segment.file, segment.offset, segment.length)
- }
-
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 317d73abba..12b70d1807 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -87,16 +87,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
- putIterator(blockId, values, level, returnValues = true)
+ putIterator(blockId, values, level)
} else {
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -106,26 +105,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*
* The caller should guarantee that `size` is correct.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
- val data =
- if (putSuccess) {
- assert(bytes.limit == size)
- Right(bytes.duplicate())
- } else {
- null
- }
- PutResult(size, data)
+ if (putSuccess) {
+ assert(bytes.limit == size)
+ }
}
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+ level: StorageLevel): Either[Iterator[Any], Long] = {
+ putIterator(blockId, values, level, allowPersistToDisk = true)
}
/**
@@ -144,32 +137,30 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- returnValues: Boolean,
- allowPersistToDisk: Boolean): PutResult = {
+ allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
- val res = {
+ val size = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
- PutResult(sizeEstimate, Left(arrayValues.iterator))
+ sizeEstimate
} else {
val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ bytes.limit()
}
}
- PutResult(res.size, res.data)
+ Right(size)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
- val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
- PutResult(res.size, res.data)
+ blockManager.diskStore.putIterator(blockId, iteratorValues, level)
} else {
- PutResult(0, Left(iteratorValues))
+ Left(iteratorValues)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
deleted file mode 100644
index f0eac7594e..0000000000
--- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-/**
- * Result of adding a block into a BlockStore. This case class contains a few things:
- * (1) The estimated size of the put,
- * (2) The values put if the caller asked for them to be returned (e.g. for chaining
- * replication), and
- * (3) A list of blocks dropped as a result of this put. This is always empty for DiskStore.
- */
-private[spark] case class PutResult(
- size: Long,
- data: Either[Iterator[_], ByteBuffer],
- droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 89b427049b..cfcbf1745d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1156,14 +1156,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll with plenty of space. This should succeed and cache both blocks.
- val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
- val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
+ val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
assert(memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
- assert(result1.size > 0) // unroll was successful
- assert(result2.size > 0)
- assert(result1.data.isLeft) // unroll did not drop this block to disk
- assert(result2.data.isLeft)
+ assert(result1.isRight) // unroll was successful
+ assert(result2.isRight)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Re-put these two blocks so block manager knows about them too. Otherwise, block manager
@@ -1174,9 +1172,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
- val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
- assert(result3.size > 0)
- assert(result3.data.isLeft)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+ assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1185,9 +1182,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
- val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
- assert(result4.size === 0) // unroll was unsuccessful
- assert(result4.data.isLeft)
+ val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+ assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1214,8 +1210,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
- val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
- assert(result3.size > 0)
+ val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+ assert(result3.isRight)
assert(!memoryStore.contains("b1"))
assert(memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1229,9 +1225,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll huge block with not enough space. This should fail and drop the new block to disk
// directly in addition to kicking out b2 in the process. Memory store should contain only
// b3, while disk store should contain b1, b2 and b4.
- val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
- assert(result4.size > 0)
- assert(result4.data.isRight) // unroll returned bytes from disk
+ val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+ assert(result4.isRight)
assert(!memoryStore.contains("b1"))
assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
@@ -1252,28 +1247,28 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// All unroll memory used is released because unrollSafely returned an array
- memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b1", smallIterator, memOnly)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
- memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b2", smallIterator, memOnly)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll memory is not released because unrollSafely returned an iterator
// that still depends on the underlying vector used in the process
- memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b3", smallIterator, memOnly)
val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB3 > 0)
// The unroll memory owned by this thread builds on top of its value after the previous unrolls
- memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b4", smallIterator, memOnly)
val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
// ... but only to a certain extent (until we run out of free space to grant new unroll memory)
- memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b5", smallIterator, memOnly)
val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b6", smallIterator, memOnly)
val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
- memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+ memoryStore.putIterator("b7", smallIterator, memOnly)
val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1286,11 +1281,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockId = BlockId("rdd_3_10")
store.blockInfoManager.lockNewBlockForWriting(
blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
- val result = memoryStore.putBytes(blockId, 13000, () => {
+ memoryStore.putBytes(blockId, 13000, () => {
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
})
- assert(result.size === 13000)
- assert(result.data === null)
}
test("put a small ByteBuffer to MemoryStore") {
@@ -1298,12 +1291,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val memoryStore = store.memoryStore
val blockId = BlockId("rdd_3_10")
var bytes: ByteBuffer = null
- val result = memoryStore.putBytes(blockId, 10000, () => {
+ memoryStore.putBytes(blockId, 10000, () => {
bytes = ByteBuffer.allocate(10000)
bytes
})
- assert(result.size === 10000)
- assert(result.data === Right(bytes))
+ assert(memoryStore.getSize(blockId) === 10000)
}
test("read-locked blocks cannot be evicted from the MemoryStore") {