aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorroot <root@ip-10-226-118-223.ec2.internal>2012-10-07 04:02:10 +0000
committerroot <root@ip-10-226-118-223.ec2.internal>2012-10-07 04:02:10 +0000
commit975009d68881eb3bebfe9168ea68b99574bebf70 (patch)
tree006721d35b96fb98959b599cd27a6731031b0bc7 /core/src/main
parent0bc63f7ef1a22ce92b21ea9fe417df43923c1ce7 (diff)
downloadspark-975009d68881eb3bebfe9168ea68b99574bebf70.tar.gz
spark-975009d68881eb3bebfe9168ea68b99574bebf70.tar.bz2
spark-975009d68881eb3bebfe9168ea68b99574bebf70.zip
Avoid acquiring locks in BlockManager when fetching shuffle outputs
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala24
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index c1d1e35ace..576ef63dbf 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -196,6 +196,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId)
+
+ // As an optimization for map output fetches, if the block is for a shuffle, return it
+ // without acquiring a lock; the disk store never deletes (recent) items so this should work
+ if (blockId.startsWith("shuffle_")) {
+ return diskStore.getValues(blockId) match {
+ case Some(iterator) =>
+ Some(iterator)
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ }
+ }
+
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
if (info != null) {
@@ -266,6 +278,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
// TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
logDebug("Getting local block " + blockId + " as bytes")
+
+ // As an optimization for map output fetches, if the block is for a shuffle, return it
+ // without acquiring a lock; the disk store never deletes (recent) items so this should work
+ if (blockId.startsWith("shuffle_")) {
+ return diskStore.getBytes(blockId) match {
+ case Some(bytes) =>
+ Some(bytes)
+ case None =>
+ throw new Exception("Block " + blockId + " not found on disk, though it should be")
+ }
+ }
+
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
if (info != null) {