diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/MemoryStore.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 39 |
1 files changed, 15 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 317d73abba..12b70d1807 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -87,16 +87,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { + override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = { // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes.duplicate() bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) - putIterator(blockId, values, level, returnValues = true) + putIterator(blockId, values, level) } else { tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), Right(bytes.duplicate())) } } @@ -106,26 +105,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * * The caller should guarantee that `size` is correct. */ - def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = { + def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = { // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) - val data = - if (putSuccess) { - assert(bytes.limit == size) - Right(bytes.duplicate()) - } else { - null - } - PutResult(size, data) + if (putSuccess) { + assert(bytes.limit == size) + } } override def putIterator( blockId: BlockId, values: Iterator[Any], - level: StorageLevel, - returnValues: Boolean): PutResult = { - putIterator(blockId, values, level, returnValues, allowPersistToDisk = true) + level: StorageLevel): Either[Iterator[Any], Long] = { + putIterator(blockId, values, level, allowPersistToDisk = true) } /** @@ -144,32 +137,30 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, values: Iterator[Any], level: StorageLevel, - returnValues: Boolean, - allowPersistToDisk: Boolean): PutResult = { + allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = { val unrolledValues = unrollSafely(blockId, values) unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array - val res = { + val size = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef]) tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true) - PutResult(sizeEstimate, Left(arrayValues.iterator)) + sizeEstimate } else { val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator) tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) - PutResult(bytes.limit(), Right(bytes.duplicate())) + bytes.limit() } } - PutResult(res.size, res.data) + Right(size) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) - PutResult(res.size, res.data) + blockManager.diskStore.putIterator(blockId, iteratorValues, level) } else { - PutResult(0, Left(iteratorValues)) + Left(iteratorValues) } } } |