diff options
author | root <root@ip-10-226-118-223.ec2.internal> | 2012-10-07 04:02:10 +0000 |
---|---|---|
committer | root <root@ip-10-226-118-223.ec2.internal> | 2012-10-07 04:02:10 +0000 |
commit | 975009d68881eb3bebfe9168ea68b99574bebf70 (patch) | |
tree | 006721d35b96fb98959b599cd27a6731031b0bc7 /core | |
parent | 0bc63f7ef1a22ce92b21ea9fe417df43923c1ce7 (diff) | |
download | spark-975009d68881eb3bebfe9168ea68b99574bebf70.tar.gz spark-975009d68881eb3bebfe9168ea68b99574bebf70.tar.bz2 spark-975009d68881eb3bebfe9168ea68b99574bebf70.zip |
Avoid acquiring locks in BlockManager when fetching shuffle outputs
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index c1d1e35ace..576ef63dbf 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -196,6 +196,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) + + // As an optimization for map output fetches, if the block is for a shuffle, return it + // without acquiring a lock; the disk store never deletes (recent) items so this should work + if (blockId.startsWith("shuffle_")) { + return diskStore.getValues(blockId) match { + case Some(iterator) => + Some(iterator) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") + } + } + locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) if (info != null) { @@ -266,6 +278,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def getLocalBytes(blockId: String): Option[ByteBuffer] = { // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow logDebug("Getting local block " + blockId + " as bytes") + + // As an optimization for map output fetches, if the block is for a shuffle, return it + // without acquiring a lock; the disk store never deletes (recent) items so this should work + if (blockId.startsWith("shuffle_")) { + return diskStore.getBytes(blockId) match { + case Some(bytes) => + Some(bytes) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") + } + } + locker.getLock(blockId).synchronized { val info = blockInfo.get(blockId) if (info != null) { |