aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-12 19:18:01 +0200
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-12 19:18:01 +0200
commitad8a7612a471a247126e736933b6271c03fd2a18 (patch)
treeb71fefacbc88bde45aae31768f2a5238e9ec3c82 /core
parent3c94e5c1880004bd77beb619fa7e3509dc7c732d (diff)
downloadspark-ad8a7612a471a247126e736933b6271c03fd2a18.tar.gz
spark-ad8a7612a471a247126e736933b6271c03fd2a18.tar.bz2
spark-ad8a7612a471a247126e736933b6271c03fd2a18.zip
Changed multi-get method in BlockManager to return an iterator
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala18
2 files changed, 13 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 06d2d09fce..283825391e 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -34,8 +34,6 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
try {
val blockOptions = blockManager.get(blocksByAddress)
- logDebug("Fetching map output blocks for shuffle %d, reduce %d took %d ms".format(
- shuffleId, reduceId, System.currentTimeMillis - startTime))
blockOptions.foreach(x => {
val (blockId, blockOption) = x
blockOption match {
@@ -65,5 +63,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
}
}
}
+ logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format(
+ shuffleId, reduceId, System.currentTimeMillis - startTime))
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index cde74e5805..b79addb6c8 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -6,6 +6,7 @@ import java.nio.channels.FileChannel.MapMode
import java.util.{HashMap => JHashMap}
import java.util.LinkedHashMap
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.LinkedBlockingQueue
import java.util.Collections
import scala.actors._
@@ -261,15 +262,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Get many blocks from local and remote block manager using their BlockManagerIds.
+ * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns
+ * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined
+ * fashion as they're received.
*/
- def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = {
+ def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): Iterator[(String, Option[Iterator[Any]])] = {
if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null")
}
- logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks")
+ val totalBlocks = blocksByAddress.map(_._2.size).sum
+ logDebug("Getting " + totalBlocks + " blocks")
var startTime = System.currentTimeMillis
- val blocks = new HashMap[String,Option[Iterator[Any]]]()
+ val blocks = new ArrayBuffer[(String, Option[Iterator[Any]])](totalBlocks)
val localBlockIds = new ArrayBuffer[String]()
val remoteBlockIds = new ArrayBuffer[String]()
val remoteBlockIdsPerLocation = new HashMap[BlockManagerId, Seq[String]]()
@@ -300,7 +304,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
localBlockIds.foreach(id => {
get(id) match {
case Some(block) => {
- blocks.update(id, Some(block))
+ blocks += ((id, Some(block)))
logDebug("Got local block " + id)
}
case None => {
@@ -325,7 +329,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val buffer = blockMessage.getData
val blockId = blockMessage.getId
val block = dataDeserialize(buffer)
- blocks.update(blockId, Some(block))
+ blocks += ((blockId, Some(block)))
logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime))
count += 1
})
@@ -339,7 +343,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
- return blocks
+ return blocks.iterator
}
/**