aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/MemoryStore.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala31
1 files changed, 26 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 1814175651..b89212eaab 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
+ override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
@@ -59,8 +59,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(values.toIterator))
} else {
tryToPut(blockId, bytes, bytes.limit, false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
@@ -69,14 +71,33 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
- : PutResult = {
-
+ : PutResult = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
tryToPut(blockId, values, sizeEstimate, true)
- PutResult(sizeEstimate, Left(values.iterator))
+ PutResult(sizeEstimate, Left(values.toIterator))
+ } else {
+ val bytes = blockManager.dataSerialize(blockId, values.toIterator)
+ tryToPut(blockId, bytes, bytes.limit, false)
+ PutResult(bytes.limit(), Right(bytes.duplicate()))
+ }
+ }
+
+ override def putValues(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ returnValues: Boolean)
+ : PutResult = {
+
+ if (level.deserialized) {
+ val valueEntries = new ArrayBuffer[Any]()
+ valueEntries ++= values
+ val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef])
+ tryToPut(blockId, valueEntries, sizeEstimate, true)
+ PutResult(sizeEstimate, Left(valueEntries.toIterator))
} else {
- val bytes = blockManager.dataSerialize(blockId, values.iterator)
+ val bytes = blockManager.dataSerialize(blockId, values)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}