aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala24
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index c1d1e35ace..576ef63dbf 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -196,6 +196,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId)
+
+ // As an optimization for map output fetches, if the block is for a shuffle, return it
+ // without acquiring a lock; the disk store never deletes (recent) items so this should work
+ if (blockId.startsWith("shuffle_")) {
+ return diskStore.getValues(blockId) match {
+ case Some(iterator) =>
+ Some(iterator)
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ }
+ }
+
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
if (info != null) {
@@ -266,6 +278,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
// TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
logDebug("Getting local block " + blockId + " as bytes")
+
+ // As an optimization for map output fetches, if the block is for a shuffle, return it
+ // without acquiring a lock; the disk store never deletes (recent) items so this should work
+ if (blockId.startsWith("shuffle_")) {
+ return diskStore.getBytes(blockId) match {
+ case Some(bytes) =>
+ Some(bytes)
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ }
+ }
+
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
if (info != null) {