diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-12 19:18:01 +0200 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-12 19:18:01 +0200 |
commit | ad8a7612a471a247126e736933b6271c03fd2a18 (patch) | |
tree | b71fefacbc88bde45aae31768f2a5238e9ec3c82 /core | |
parent | 3c94e5c1880004bd77beb619fa7e3509dc7c732d (diff) | |
download | spark-ad8a7612a471a247126e736933b6271c03fd2a18.tar.gz spark-ad8a7612a471a247126e736933b6271c03fd2a18.tar.bz2 spark-ad8a7612a471a247126e736933b6271c03fd2a18.zip |
Changed multi-get method in BlockManager to return an iterator
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/BlockStoreShuffleFetcher.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 18 |
2 files changed, 13 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 06d2d09fce..283825391e 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -34,8 +34,6 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { try { val blockOptions = blockManager.get(blocksByAddress) - logDebug("Fetching map output blocks for shuffle %d, reduce %d took %d ms".format( - shuffleId, reduceId, System.currentTimeMillis - startTime)) blockOptions.foreach(x => { val (blockId, blockOption) = x blockOption match { @@ -65,5 +63,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { } } } + logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format( + shuffleId, reduceId, System.currentTimeMillis - startTime)) } } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index cde74e5805..b79addb6c8 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -6,6 +6,7 @@ import java.nio.channels.FileChannel.MapMode import java.util.{HashMap => JHashMap} import java.util.LinkedHashMap import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.LinkedBlockingQueue import java.util.Collections import scala.actors._ @@ -261,15 +262,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Get many blocks from local and remote block manager using their BlockManagerIds. + * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns + * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined + * fashion as they're received. */ - def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = { + def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): Iterator[(String, Option[Iterator[Any]])] = { if (blocksByAddress == null) { throw new IllegalArgumentException("BlocksByAddress is null") } - logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks") + val totalBlocks = blocksByAddress.map(_._2.size).sum + logDebug("Getting " + totalBlocks + " blocks") var startTime = System.currentTimeMillis - val blocks = new HashMap[String,Option[Iterator[Any]]]() + val blocks = new ArrayBuffer[(String, Option[Iterator[Any]])](totalBlocks) val localBlockIds = new ArrayBuffer[String]() val remoteBlockIds = new ArrayBuffer[String]() val remoteBlockIdsPerLocation = new HashMap[BlockManagerId, Seq[String]]() @@ -300,7 +304,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m localBlockIds.foreach(id => { get(id) match { case Some(block) => { - blocks.update(id, Some(block)) + blocks += ((id, Some(block))) logDebug("Got local block " + id) } case None => { @@ -325,7 +329,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val buffer = blockMessage.getData val blockId = blockMessage.getId val block = dataDeserialize(buffer) - blocks.update(blockId, Some(block)) + blocks += ((blockId, Some(block))) logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime)) count += 1 }) @@ -339,7 +343,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms") - return blocks + return blocks.iterator } /** |