aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-01 12:01:38 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-01 12:01:38 -0700
commit3b348f909d209d57d2a94b9eb2f7eafe48dd7cbe (patch)
treeb7a86918df838d2f846774e69d013d62fd3e1406 /core/src/main
parent22b6b16e56f5a1863398af62fd94790d6eb977b7 (diff)
downloadspark-3b348f909d209d57d2a94b9eb2f7eafe48dd7cbe.tar.gz
spark-3b348f909d209d57d2a94b9eb2f7eafe48dd7cbe.tar.bz2
spark-3b348f909d209d57d2a94b9eb2f7eafe48dd7cbe.zip
Improve log messages from BlockManager
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala60
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala8
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala16
4 files changed, 41 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 081981c838..e7dea904c3 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -99,7 +99,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
private def initialize() {
master.mustRegisterBlockManager(
- RegisterBlockManager(blockManagerId, maxMemory, maxMemory))
+ RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 8e4f9f7c15..0c654364cd 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -20,8 +20,7 @@ sealed trait ToBlockManagerMaster
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
- maxMemSize: Long,
- maxDiskSize: Long)
+ maxMemSize: Long)
extends ToBlockManagerMaster
class HeartBeat(
@@ -85,22 +84,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
- val maxMem: Long,
- val maxDisk: Long) {
+ val maxMem: Long) {
private var lastSeenMs = timeMs
- private var remainedMem = maxMem
- private var remainedDisk = maxDisk
+ private var remainingMem = maxMem
private val blocks = new JHashMap[String, StorageLevel]
- logInfo("Registering block manager (%s:%d, ram: %d, disk: %d)".format(
- blockManagerId.ip, blockManagerId.port, maxMem, maxDisk))
+ logInfo("Registering block manager %s:%d with %s RAM".format(
+ blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
def updateLastSeenMs() {
lastSeenMs = System.currentTimeMillis() / 1000
}
- def updateBlockInfo(
- blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) = synchronized {
+ def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+ : Unit = synchronized {
updateLastSeenMs()
@@ -109,10 +106,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val originalLevel: StorageLevel = blocks.get(blockId)
if (originalLevel.useMemory) {
- remainedMem += memSize
- }
- if (originalLevel.useDisk) {
- remainedDisk += diskSize
+ remainingMem += memSize
}
}
@@ -120,26 +114,28 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// isValid means it is either stored in-memory or on-disk.
blocks.put(blockId, storageLevel)
if (storageLevel.useMemory) {
- remainedMem -= memSize
- logInfo("Added %s in memory on %s:%d (size: %d, free: %d)".format(
- blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedMem))
+ remainingMem -= memSize
+ logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(remainingMem)))
}
if (storageLevel.useDisk) {
- remainedDisk -= diskSize
- logInfo("Added %s on disk on %s:%d (size: %d, free: %d)".format(
- blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
+ logInfo("Added %s on disk on %s:%d (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
} else if (blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val originalLevel: StorageLevel = blocks.get(blockId)
blocks.remove(blockId)
if (originalLevel.useMemory) {
- logInfo("Removed %s on %s:%d in memory (size: %d, free: %d)".format(
- blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedDisk))
+ remainingMem += memSize
+ logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(remainingMem)))
}
if (originalLevel.useDisk) {
- logInfo("Removed %s on %s:%d on disk (size: %d, free: %d)".format(
- blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
+ logInfo("Removed %s on %s:%d on disk (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
}
}
@@ -149,15 +145,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
def getRemainedMem: Long = {
- return remainedMem
- }
-
- def getRemainedDisk: Long = {
- return remainedDisk
+ return remainingMem
}
override def toString: String = {
- return "BlockManagerInfo " + timeMs + " " + remainedMem + " " + remainedDisk
+ return "BlockManagerInfo " + timeMs + " " + remainingMem
}
def clear() {
@@ -181,8 +173,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
def receive = {
- case RegisterBlockManager(blockManagerId, maxMemSize, maxDiskSize) =>
- register(blockManagerId, maxMemSize, maxDiskSize)
+ case RegisterBlockManager(blockManagerId, maxMemSize) =>
+ register(blockManagerId, maxMemSize)
case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
@@ -210,7 +202,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
logInfo("Got unknown message: " + other)
}
- private def register(blockManagerId: BlockManagerId, maxMemSize: Long, maxDiskSize: Long) {
+ private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -218,7 +210,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
- blockManagerId, System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
+ blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
}
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 34bb989485..e198813456 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -39,8 +39,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
channel.close()
val finishTime = System.currentTimeMillis
- logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
- blockId, bytes.limit, (finishTime - startTime)))
+ logDebug("Block %s stored as %s file on disk in %d ms".format(
+ blockId, Utils.memoryBytesToString(bytes.limit), (finishTime - startTime)))
}
override def putValues(
@@ -51,12 +51,16 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
: Either[Iterator[Any], ByteBuffer] = {
logDebug("Attempting to write values for block " + blockId)
+ val startTime = System.currentTimeMillis
val file = createFile(blockId)
val fileOut = blockManager.wrapForCompression(
new FastBufferedOutputStream(new FileOutputStream(file)))
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()
+ val finishTime = System.currentTimeMillis
+ logDebug("Block %s stored as %s file on disk in %d ms".format(
+ blockId, Utils.memoryBytesToString(file.length()), (finishTime - startTime)))
if (returnValues) {
// Return a byte buffer for the contents of the file
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index d71585b6e3..93520dc4ff 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -56,15 +56,15 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
- blockId, sizeEstimate, freeMemory))
+ logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
} else {
val entry = new Entry(bytes, bytes.limit, false)
ensureFreeSpace(bytes.limit)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += bytes.limit
- logInfo("Block %s stored as %d bytes to memory (free %d)".format(
- blockId, bytes.limit, freeMemory))
+ logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
}
}
@@ -83,8 +83,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += sizeEstimate
- logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
- blockId, sizeEstimate, freeMemory))
+ logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
Left(elements.iterator)
} else {
val bytes = blockManager.dataSerialize(values)
@@ -92,8 +92,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(bytes, bytes.limit, false)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += bytes.limit
- logInfo("Block %s stored as %d bytes to memory (free %d)".format(
- blockId, bytes.limit, freeMemory))
+ logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
Right(bytes)
}
}