aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <root@ip-10-226-118-223.ec2.internal>2012-10-07 06:35:48 +0000
committerMatei Zaharia <root@ip-10-226-118-223.ec2.internal>2012-10-07 06:35:48 +0000
commitea096f7cd5c20acfa89194db955e8f155c8e034b (patch)
tree65b0654c5f5a86d236ca8881c1124aef020f7006 /core/src/main
parent554b42cb24edb39aa8a9888b7f267ea742758176 (diff)
downloadspark-ea096f7cd5c20acfa89194db955e8f155c8e034b.tar.gz
spark-ea096f7cd5c20acfa89194db955e8f155c8e034b.tar.bz2
spark-ea096f7cd5c20acfa89194db955e8f155c8e034b.zip
More logging
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala11
1 files changed, 7 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 576ef63dbf..6c568cc2b0 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -411,8 +411,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
var bytesInFlight = 0L
def sendRequest(req: FetchRequest) {
+ logDebug("Sending request for %d blocks (%s) from %s".format(
+ req.blocks.size, Utils.memoryBytesToString(req.size), req.address.ip))
val cmId = new ConnectionManagerId(req.address.ip, req.address.port)
- val blockMessageArray = new BlockMessageArray(req.blocks.map{
+ val blockMessageArray = new BlockMessageArray(req.blocks.map {
case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))
})
bytesInFlight += req.size
@@ -450,10 +452,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
localBlockIds ++= blockInfos.map(_._1)
} else {
remoteBlockIds ++= blockInfos.map(_._1)
- // Make our requests at least maxBytesInFlight / 4 in length; the reason to keep them
- // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 4
+ // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
+ // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
// nodes, rather than blocking on reading output from one node.
- val minRequestSize = math.max(maxBytesInFlight / 4, 1L)
+ val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
+ logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[(String, Long)]