diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 137 |
1 files changed, 84 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 996c1f5d9e..b503410a73 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -425,39 +425,10 @@ private[spark] class BlockManager( val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { - val diskIterator = dataDeserialize(blockId, diskBytes) - if (level.useMemory) { - // Cache the values before returning them - memoryStore.putIterator(blockId, diskIterator, level) match { - case Left(iter) => - // The memory store put() failed, so it returned the iterator back to us: - iter - case Right(_) => - // The put() succeeded, so we can read the values back: - memoryStore.getValues(blockId).get - } - } else { - diskIterator - } - } else { // storage level is serialized - if (level.useMemory) { - // Cache the bytes back into memory to speed up subsequent reads. - val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => { - // https://issues.apache.org/jira/browse/SPARK-6076 - // If the file size is bigger than the free memory, OOM will happen. So if we - // cannot put it into MemoryStore, copyForMemory should not be created. That's why - // this action is put into a `() => ByteBuffer` and created lazily. - val copyForMemory = ByteBuffer.allocate(diskBytes.limit) - copyForMemory.put(diskBytes) - }) - if (putSucceeded) { - dataDeserialize(blockId, memoryStore.getBytes(blockId).get) - } else { - dataDeserialize(blockId, diskBytes) - } - } else { - dataDeserialize(blockId, diskBytes) - } + val diskValues = dataDeserialize(blockId, diskBytes) + maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) + } else { + dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)) } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) @@ -517,26 +488,7 @@ private[spark] class BlockManager( if (level.useMemory && memoryStore.contains(blockId)) { memoryStore.getBytes(blockId).get } else if (level.useDisk && diskStore.contains(blockId)) { - val bytes = diskStore.getBytes(blockId) - if (level.useMemory) { - // Cache the bytes back into memory to speed up subsequent reads. - val memoryStorePutSucceeded = memoryStore.putBytes(blockId, bytes.limit(), () => { - // https://issues.apache.org/jira/browse/SPARK-6076 - // If the file size is bigger than the free memory, OOM will happen. So if we cannot - // put it into MemoryStore, copyForMemory should not be created. That's why this - // action is put into a `() => ByteBuffer` and created lazily. - val copyForMemory = ByteBuffer.allocate(bytes.limit) - copyForMemory.put(bytes) - }) - if (memoryStorePutSucceeded) { - memoryStore.getBytes(blockId).get - } else { - bytes.rewind() - bytes - } - } else { - bytes - } + maybeCacheDiskBytesInMemory(info, blockId, level, diskStore.getBytes(blockId)) } else { releaseLock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") @@ -967,6 +919,85 @@ private[spark] class BlockManager( } /** + * Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up + * subsequent reads. This method requires the caller to hold a read lock on the block. + * + * @return a copy of the bytes. The original bytes passed this method should no longer + * be used after this method returns. + */ + private def maybeCacheDiskBytesInMemory( + blockInfo: BlockInfo, + blockId: BlockId, + level: StorageLevel, + diskBytes: ByteBuffer): ByteBuffer = { + require(!level.deserialized) + if (level.useMemory) { + // Synchronize on blockInfo to guard against a race condition where two readers both try to + // put values read from disk into the MemoryStore. + blockInfo.synchronized { + if (memoryStore.contains(blockId)) { + BlockManager.dispose(diskBytes) + memoryStore.getBytes(blockId).get + } else { + val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => { + // https://issues.apache.org/jira/browse/SPARK-6076 + // If the file size is bigger than the free memory, OOM will happen. So if we + // cannot put it into MemoryStore, copyForMemory should not be created. That's why + // this action is put into a `() => ByteBuffer` and created lazily. + val copyForMemory = ByteBuffer.allocate(diskBytes.limit) + copyForMemory.put(diskBytes) + }) + if (putSucceeded) { + BlockManager.dispose(diskBytes) + memoryStore.getBytes(blockId).get + } else { + diskBytes.rewind() + diskBytes + } + } + } + } else { + diskBytes + } + } + + /** + * Attempts to cache spilled values read from disk into the MemoryStore in order to speed up + * subsequent reads. This method requires the caller to hold a read lock on the block. + * + * @return a copy of the iterator. The original iterator passed this method should no longer + * be used after this method returns. + */ + private def maybeCacheDiskValuesInMemory( + blockInfo: BlockInfo, + blockId: BlockId, + level: StorageLevel, + diskIterator: Iterator[Any]): Iterator[Any] = { + require(level.deserialized) + if (level.useMemory) { + // Synchronize on blockInfo to guard against a race condition where two readers both try to + // put values read from disk into the MemoryStore. + blockInfo.synchronized { + if (memoryStore.contains(blockId)) { + // Note: if we had a means to discard the disk iterator, we would do that here. + memoryStore.getValues(blockId).get + } else { + memoryStore.putIterator(blockId, diskIterator, level) match { + case Left(iter) => + // The memory store put() failed, so it returned the iterator back to us: + iter + case Right(_) => + // The put() succeeded, so we can read the values back: + memoryStore.getValues(blockId).get + } + } + } + } else { + diskIterator + } + } + + /** * Get peer block managers in the system. */ private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { |