aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-14 10:48:24 -0700
committerAndrew Or <andrew@databricks.com>2016-03-14 10:48:24 -0700
commit9a87afd7d1dbcee993d58e648c74aa683d91f07e (patch)
tree80b0652ccc8c55228cc05423bcc2ad6c6f39b4f1
parent9a1680c2c85da4096bad71743debb2ccacdfd79f (diff)
downloadspark-9a87afd7d1dbcee993d58e648c74aa683d91f07e.tar.gz
spark-9a87afd7d1dbcee993d58e648c74aa683d91f07e.tar.bz2
spark-9a87afd7d1dbcee993d58e648c74aa683d91f07e.zip
[SPARK-13833] Guard against race condition when re-caching disk blocks in memory
When reading data from the DiskStore and attempting to cache it back into the memory store, we should guard against race conditions where multiple readers are attempting to re-cache the same block in memory. This patch accomplishes this by synchronizing on the block's `BlockInfo` object while trying to re-cache a block. (Will file JIRA as soon as ASF JIRA stops being down / laggy). Author: Josh Rosen <joshrosen@databricks.com> Closes #11660 from JoshRosen/concurrent-recaching-fixes.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala137
1 files changed, 84 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 996c1f5d9e..b503410a73 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -425,39 +425,10 @@ private[spark] class BlockManager(
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
- val diskIterator = dataDeserialize(blockId, diskBytes)
- if (level.useMemory) {
- // Cache the values before returning them
- memoryStore.putIterator(blockId, diskIterator, level) match {
- case Left(iter) =>
- // The memory store put() failed, so it returned the iterator back to us:
- iter
- case Right(_) =>
- // The put() succeeded, so we can read the values back:
- memoryStore.getValues(blockId).get
- }
- } else {
- diskIterator
- }
- } else { // storage level is serialized
- if (level.useMemory) {
- // Cache the bytes back into memory to speed up subsequent reads.
- val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
- // https://issues.apache.org/jira/browse/SPARK-6076
- // If the file size is bigger than the free memory, OOM will happen. So if we
- // cannot put it into MemoryStore, copyForMemory should not be created. That's why
- // this action is put into a `() => ByteBuffer` and created lazily.
- val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
- copyForMemory.put(diskBytes)
- })
- if (putSucceeded) {
- dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
- } else {
- dataDeserialize(blockId, diskBytes)
- }
- } else {
- dataDeserialize(blockId, diskBytes)
- }
+ val diskValues = dataDeserialize(blockId, diskBytes)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ dataDeserialize(blockId, maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes))
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
@@ -517,26 +488,7 @@ private[spark] class BlockManager(
if (level.useMemory && memoryStore.contains(blockId)) {
memoryStore.getBytes(blockId).get
} else if (level.useDisk && diskStore.contains(blockId)) {
- val bytes = diskStore.getBytes(blockId)
- if (level.useMemory) {
- // Cache the bytes back into memory to speed up subsequent reads.
- val memoryStorePutSucceeded = memoryStore.putBytes(blockId, bytes.limit(), () => {
- // https://issues.apache.org/jira/browse/SPARK-6076
- // If the file size is bigger than the free memory, OOM will happen. So if we cannot
- // put it into MemoryStore, copyForMemory should not be created. That's why this
- // action is put into a `() => ByteBuffer` and created lazily.
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- })
- if (memoryStorePutSucceeded) {
- memoryStore.getBytes(blockId).get
- } else {
- bytes.rewind()
- bytes
- }
- } else {
- bytes
- }
+ maybeCacheDiskBytesInMemory(info, blockId, level, diskStore.getBytes(blockId))
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
@@ -967,6 +919,85 @@ private[spark] class BlockManager(
}
/**
+ * Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up
+ * subsequent reads. This method requires the caller to hold a read lock on the block.
+ *
+ * @return a copy of the bytes. The original bytes passed this method should no longer
+ * be used after this method returns.
+ */
+ private def maybeCacheDiskBytesInMemory(
+ blockInfo: BlockInfo,
+ blockId: BlockId,
+ level: StorageLevel,
+ diskBytes: ByteBuffer): ByteBuffer = {
+ require(!level.deserialized)
+ if (level.useMemory) {
+ // Synchronize on blockInfo to guard against a race condition where two readers both try to
+ // put values read from disk into the MemoryStore.
+ blockInfo.synchronized {
+ if (memoryStore.contains(blockId)) {
+ BlockManager.dispose(diskBytes)
+ memoryStore.getBytes(blockId).get
+ } else {
+ val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we
+ // cannot put it into MemoryStore, copyForMemory should not be created. That's why
+ // this action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
+ copyForMemory.put(diskBytes)
+ })
+ if (putSucceeded) {
+ BlockManager.dispose(diskBytes)
+ memoryStore.getBytes(blockId).get
+ } else {
+ diskBytes.rewind()
+ diskBytes
+ }
+ }
+ }
+ } else {
+ diskBytes
+ }
+ }
+
+ /**
+ * Attempts to cache spilled values read from disk into the MemoryStore in order to speed up
+ * subsequent reads. This method requires the caller to hold a read lock on the block.
+ *
+ * @return a copy of the iterator. The original iterator passed this method should no longer
+ * be used after this method returns.
+ */
+ private def maybeCacheDiskValuesInMemory(
+ blockInfo: BlockInfo,
+ blockId: BlockId,
+ level: StorageLevel,
+ diskIterator: Iterator[Any]): Iterator[Any] = {
+ require(level.deserialized)
+ if (level.useMemory) {
+ // Synchronize on blockInfo to guard against a race condition where two readers both try to
+ // put values read from disk into the MemoryStore.
+ blockInfo.synchronized {
+ if (memoryStore.contains(blockId)) {
+ // Note: if we had a means to discard the disk iterator, we would do that here.
+ memoryStore.getValues(blockId).get
+ } else {
+ memoryStore.putIterator(blockId, diskIterator, level) match {
+ case Left(iter) =>
+ // The memory store put() failed, so it returned the iterator back to us:
+ iter
+ case Right(_) =>
+ // The put() succeeded, so we can read the values back:
+ memoryStore.getValues(blockId).get
+ }
+ }
+ }
+ } else {
+ diskIterator
+ }
+ }
+
+ /**
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {