aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/MemoryStore.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala39
1 files changed, 15 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 317d73abba..12b70d1807 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -87,16 +87,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
- putIterator(blockId, values, level, returnValues = true)
+ putIterator(blockId, values, level)
} else {
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -106,26 +105,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*
* The caller should guarantee that `size` is correct.
*/
- def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
- val data =
- if (putSuccess) {
- assert(bytes.limit == size)
- Right(bytes.duplicate())
- } else {
- null
- }
- PutResult(size, data)
+ if (putSuccess) {
+ assert(bytes.limit == size)
+ }
}
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
- level: StorageLevel,
- returnValues: Boolean): PutResult = {
- putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+ level: StorageLevel): Either[Iterator[Any], Long] = {
+ putIterator(blockId, values, level, allowPersistToDisk = true)
}
/**
@@ -144,32 +137,30 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- returnValues: Boolean,
- allowPersistToDisk: Boolean): PutResult = {
+ allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
val unrolledValues = unrollSafely(blockId, values)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
- val res = {
+ val size = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
- PutResult(sizeEstimate, Left(arrayValues.iterator))
+ sizeEstimate
} else {
val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
- PutResult(bytes.limit(), Right(bytes.duplicate()))
+ bytes.limit()
}
}
- PutResult(res.size, res.data)
+ Right(size)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
- val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
- PutResult(res.size, res.data)
+ blockManager.diskStore.putIterator(blockId, iteratorValues, level)
} else {
- PutResult(0, Left(iteratorValues))
+ Left(iteratorValues)
}
}
}