diff options
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index c1d1e35ace..576ef63dbf 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -196,6 +196,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) + + // As an optimization for map output fetches, if the block is for a shuffle, return it + // without acquiring a lock; the disk store never deletes (recent) items so this should work + if (blockId.startsWith("shuffle_")) { + return diskStore.getValues(blockId) match { + case Some(iterator) => + Some(iterator) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") + } + } + locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) if (info != null) { @@ -266,6 +278,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def getLocalBytes(blockId: String): Option[ByteBuffer] = { // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow logDebug("Getting local block " + blockId + " as bytes") + + // As an optimization for map output fetches, if the block is for a shuffle, return it + // without acquiring a lock; the disk store never deletes (recent) items so this should work + if (blockId.startsWith("shuffle_")) { + return diskStore.getBytes(blockId) match { + case Some(bytes) => + Some(bytes) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") + } + } + locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) if (info != null) { |