From 3b348f909d209d57d2a94b9eb2f7eafe48dd7cbe Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 1 Oct 2012 12:01:38 -0700 Subject: Improve log messages from BlockManager --- .../main/scala/spark/storage/BlockManager.scala | 2 +- .../scala/spark/storage/BlockManagerMaster.scala | 60 ++++++++++------------ core/src/main/scala/spark/storage/DiskStore.scala | 8 ++- .../src/main/scala/spark/storage/MemoryStore.scala | 16 +++--- 4 files changed, 41 insertions(+), 45 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 081981c838..e7dea904c3 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -99,7 +99,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ private def initialize() { master.mustRegisterBlockManager( - RegisterBlockManager(blockManagerId, maxMemory, maxMemory)) + RegisterBlockManager(blockManagerId, maxMemory)) BlockManagerWorker.startBlockManagerWorker(this) } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 8e4f9f7c15..0c654364cd 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -20,8 +20,7 @@ sealed trait ToBlockManagerMaster case class RegisterBlockManager( blockManagerId: BlockManagerId, - maxMemSize: Long, - maxDiskSize: Long) + maxMemSize: Long) extends ToBlockManagerMaster class HeartBeat( @@ -85,22 +84,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, - val maxMem: Long, - val maxDisk: Long) { + val maxMem: Long) { private var lastSeenMs = timeMs - private var remainedMem = maxMem - private var remainedDisk = maxDisk + private var remainingMem = maxMem private val blocks = new JHashMap[String, StorageLevel] - logInfo("Registering block manager (%s:%d, ram: %d, disk: %d)".format( - blockManagerId.ip, blockManagerId.port, maxMem, maxDisk)) + logInfo("Registering block manager %s:%d with %s RAM".format( + blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) def updateLastSeenMs() { lastSeenMs = System.currentTimeMillis() / 1000 } - def updateBlockInfo( - blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) = synchronized { + def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) + : Unit = synchronized { updateLastSeenMs() @@ -109,10 +106,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { val originalLevel: StorageLevel = blocks.get(blockId) if (originalLevel.useMemory) { - remainedMem += memSize - } - if (originalLevel.useDisk) { - remainedDisk += diskSize + remainingMem += memSize } } @@ -120,26 +114,28 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { // isValid means it is either stored in-memory or on-disk. blocks.put(blockId, storageLevel) if (storageLevel.useMemory) { - remainedMem -= memSize - logInfo("Added %s in memory on %s:%d (size: %d, free: %d)".format( - blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedMem)) + remainingMem -= memSize + logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), + Utils.memoryBytesToString(remainingMem))) } if (storageLevel.useDisk) { - remainedDisk -= diskSize - logInfo("Added %s on disk on %s:%d (size: %d, free: %d)".format( - blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk)) + logInfo("Added %s on disk on %s:%d (size: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) } } else if (blocks.containsKey(blockId)) { // If isValid is not true, drop the block. val originalLevel: StorageLevel = blocks.get(blockId) blocks.remove(blockId) if (originalLevel.useMemory) { - logInfo("Removed %s on %s:%d in memory (size: %d, free: %d)".format( - blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedDisk)) + remainingMem += memSize + logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), + Utils.memoryBytesToString(remainingMem))) } if (originalLevel.useDisk) { - logInfo("Removed %s on %s:%d on disk (size: %d, free: %d)".format( - blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk)) + logInfo("Removed %s on %s:%d on disk (size: %s)".format( + blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) } } } @@ -149,15 +145,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } def getRemainedMem: Long = { - return remainedMem - } - - def getRemainedDisk: Long = { - return remainedDisk + return remainingMem } override def toString: String = { - return "BlockManagerInfo " + timeMs + " " + remainedMem + " " + remainedDisk + return "BlockManagerInfo " + timeMs + " " + remainingMem } def clear() { @@ -181,8 +173,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { } def receive = { - case RegisterBlockManager(blockManagerId, maxMemSize, maxDiskSize) => - register(blockManagerId, maxMemSize, maxDiskSize) + case RegisterBlockManager(blockManagerId, maxMemSize) => + register(blockManagerId, maxMemSize) case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) => heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) @@ -210,7 +202,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { logInfo("Got unknown message: " + other) } - private def register(blockManagerId: BlockManagerId, maxMemSize: Long, maxDiskSize: Long) { + private def register(blockManagerId: BlockManagerId, maxMemSize: Long) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) @@ -218,7 +210,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { logInfo("Got Register Msg from master node, don't register it") } else { blockManagerInfo += (blockManagerId -> new BlockManagerInfo( - blockManagerId, System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize)) + blockManagerId, System.currentTimeMillis() / 1000, maxMemSize)) } logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 34bb989485..e198813456 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -39,8 +39,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } channel.close() val finishTime = System.currentTimeMillis - logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( - blockId, bytes.limit, (finishTime - startTime))) + logDebug("Block %s stored as %s file on disk in %d ms".format( + blockId, Utils.memoryBytesToString(bytes.limit), (finishTime - startTime))) } override def putValues( @@ -51,12 +51,16 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) : Either[Iterator[Any], ByteBuffer] = { logDebug("Attempting to write values for block " + blockId) + val startTime = System.currentTimeMillis val file = createFile(blockId) val fileOut = blockManager.wrapForCompression( new FastBufferedOutputStream(new FileOutputStream(file))) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) objOut.writeAll(values) objOut.close() + val finishTime = System.currentTimeMillis + logDebug("Block %s stored as %s file on disk in %d ms".format( + blockId, Utils.memoryBytesToString(file.length()), (finishTime - startTime))) if (returnValues) { // Return a byte buffer for the contents of the file diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index d71585b6e3..93520dc4ff 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -56,15 +56,15 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = new Entry(elements, sizeEstimate, true) entries.synchronized { entries.put(blockId, entry) } currentMemory += sizeEstimate - logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( - blockId, sizeEstimate, freeMemory)) + logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( + blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory))) } else { val entry = new Entry(bytes, bytes.limit, false) ensureFreeSpace(bytes.limit) entries.synchronized { entries.put(blockId, entry) } currentMemory += bytes.limit - logInfo("Block %s stored as %d bytes to memory (free %d)".format( - blockId, bytes.limit, freeMemory)) + logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format( + blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory))) } } @@ -83,8 +83,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = new Entry(elements, sizeEstimate, true) entries.synchronized { entries.put(blockId, entry) } currentMemory += sizeEstimate - logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( - blockId, sizeEstimate, freeMemory)) + logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( + blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory))) Left(elements.iterator) } else { val bytes = blockManager.dataSerialize(values) @@ -92,8 +92,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = new Entry(bytes, bytes.limit, false) entries.synchronized { entries.put(blockId, entry) } currentMemory += bytes.limit - logInfo("Block %s stored as %d bytes to memory (free %d)".format( - blockId, bytes.limit, freeMemory)) + logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format( + blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory))) Right(bytes) } } -- cgit v1.2.3