aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala137
1 files changed, 84 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 996c1f5d9e..b503410a73 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -425,39 +425,10 @@ private[spark] class BlockManager(
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
- val diskIterator = dataDeserialize(blockId, diskBytes)
- if (level.useMemory) {
- // Cache the values before returning them
- memoryStore.putIterator(blockId, diskIterator, level) match {
- case Left(iter) =>
- // The memory store put() failed, so it returned the iterator back to us:
- iter
- case Right(_) =>
- // The put() succeeded, so we can read the values back:
- memoryStore.getValues(blockId).get
- }
- } else {
- diskIterator
- }
- } else { // storage level is serialized
- if (level.useMemory) {
- // Cache the bytes back into memory to speed up subsequent reads.
- val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
- // https://issues.apache.org/jira/browse/SPARK-6076
- // If the file size is bigger than the free memory, OOM will happen. So if we
- // cannot put it into MemoryStore, copyForMemory should not be created. That's why
- // this action is put into a `() => ByteBuffer` and created lazily.
- val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
- copyForMemory.put(diskBytes)
- })
- if (putSucceeded) {
- dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
- } else {
- dataDeserialize(blockId, diskBytes)
- }
- } else {
- dataDeserialize(blockId, diskBytes)
- }
+ val diskValues = dataDeserialize(blockId, diskBytes)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes))
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
@@ -517,26 +488,7 @@ private[spark] class BlockManager(
if (level.useMemory && memoryStore.contains(blockId)) {
memoryStore.getBytes(blockId).get
} else if (level.useDisk && diskStore.contains(blockId)) {
- val bytes = diskStore.getBytes(blockId)
- if (level.useMemory) {
- // Cache the bytes back into memory to speed up subsequent reads.
- val memoryStorePutSucceeded = memoryStore.putBytes(blockId, bytes.limit(), () => {
- // https://issues.apache.org/jira/browse/SPARK-6076
- // If the file size is bigger than the free memory, OOM will happen. So if we cannot
- // put it into MemoryStore, copyForMemory should not be created. That's why this
- // action is put into a `() => ByteBuffer` and created lazily.
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- })
- if (memoryStorePutSucceeded) {
- memoryStore.getBytes(blockId).get
- } else {
- bytes.rewind()
- bytes
- }
- } else {
- bytes
- }
+ maybeCacheDiskBytesInMemory(info, blockId, level, diskStore.getBytes(blockId))
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
@@ -967,6 +919,85 @@ private[spark] class BlockManager(
}
/**
+ * Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up
+ * subsequent reads. This method requires the caller to hold a read lock on the block.
+ *
+ * @return a copy of the bytes. The original bytes passed this method should no longer
+ * be used after this method returns.
+ */
+ private def maybeCacheDiskBytesInMemory(
+ blockInfo: BlockInfo,
+ blockId: BlockId,
+ level: StorageLevel,
+ diskBytes: ByteBuffer): ByteBuffer = {
+ require(!level.deserialized)
+ if (level.useMemory) {
+ // Synchronize on blockInfo to guard against a race condition where two readers both try to
+ // put values read from disk into the MemoryStore.
+ blockInfo.synchronized {
+ if (memoryStore.contains(blockId)) {
+ BlockManager.dispose(diskBytes)
+ memoryStore.getBytes(blockId).get
+ } else {
+ val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we
+ // cannot put it into MemoryStore, copyForMemory should not be created. That's why
+ // this action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
+ copyForMemory.put(diskBytes)
+ })
+ if (putSucceeded) {
+ BlockManager.dispose(diskBytes)
+ memoryStore.getBytes(blockId).get
+ } else {
+ diskBytes.rewind()
+ diskBytes
+ }
+ }
+ }
+ } else {
+ diskBytes
+ }
+ }
+
+ /**
+ * Attempts to cache spilled values read from disk into the MemoryStore in order to speed up
+ * subsequent reads. This method requires the caller to hold a read lock on the block.
+ *
+ * @return a copy of the iterator. The original iterator passed this method should no longer
+ * be used after this method returns.
+ */
+ private def maybeCacheDiskValuesInMemory(
+ blockInfo: BlockInfo,
+ blockId: BlockId,
+ level: StorageLevel,
+ diskIterator: Iterator[Any]): Iterator[Any] = {
+ require(level.deserialized)
+ if (level.useMemory) {
+ // Synchronize on blockInfo to guard against a race condition where two readers both try to
+ // put values read from disk into the MemoryStore.
+ blockInfo.synchronized {
+ if (memoryStore.contains(blockId)) {
+ // Note: if we had a means to discard the disk iterator, we would do that here.
+ memoryStore.getValues(blockId).get
+ } else {
+ memoryStore.putIterator(blockId, diskIterator, level) match {
+ case Left(iter) =>
+ // The memory store put() failed, so it returned the iterator back to us:
+ iter
+ case Right(_) =>
+ // The put() succeeded, so we can read the values back:
+ memoryStore.getValues(blockId).get
+ }
+ }
+ }
+ } else {
+ diskIterator
+ }
+ }
+
+ /**
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {