aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala52
1 files changed, 35 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 467c3e0e6b..6f85b9e4d6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -497,11 +497,17 @@ private[spark] class BlockManagerInfo(
updateLastSeenMs()
- if (_blocks.containsKey(blockId)) {
+ val blockExists = _blocks.containsKey(blockId)
+ var originalMemSize: Long = 0
+ var originalDiskSize: Long = 0
+ var originalLevel: StorageLevel = StorageLevel.NONE
+
+ if (blockExists) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
- val originalLevel: StorageLevel = blockStatus.storageLevel
- val originalMemSize: Long = blockStatus.memSize
+ originalLevel = blockStatus.storageLevel
+ originalMemSize = blockStatus.memSize
+ originalDiskSize = blockStatus.diskSize
if (originalLevel.useMemory) {
_remainingMem += originalMemSize
@@ -520,32 +526,44 @@ private[spark] class BlockManagerInfo(
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
- logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
- Utils.bytesToString(_remainingMem)))
+ if (blockExists) {
+ logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
+ s" (current size: ${Utils.bytesToString(memSize)}," +
+ s" original size: ${Utils.bytesToString(originalMemSize)}," +
+ s" free: ${Utils.bytesToString(_remainingMem)})")
+ } else {
+ logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
+ s" (size: ${Utils.bytesToString(memSize)}," +
+ s" free: ${Utils.bytesToString(_remainingMem)})")
+ }
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
- logInfo("Added %s on disk on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
+ if (blockExists) {
+ logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
+ s" (current size: ${Utils.bytesToString(diskSize)}," +
+ s" original size: ${Utils.bytesToString(originalDiskSize)})")
+ } else {
+ logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
+ s" (size: ${Utils.bytesToString(diskSize)})")
+ }
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
- } else if (_blocks.containsKey(blockId)) {
+ } else if (blockExists) {
// If isValid is not true, drop the block.
- val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
_cachedBlocks -= blockId
- if (blockStatus.storageLevel.useMemory) {
- logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
- Utils.bytesToString(_remainingMem)))
+ if (originalLevel.useMemory) {
+ logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
+ s" (size: ${Utils.bytesToString(originalMemSize)}," +
+ s" free: ${Utils.bytesToString(_remainingMem)})")
}
- if (blockStatus.storageLevel.useDisk) {
- logInfo("Removed %s on %s on disk (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
+ if (originalLevel.useDisk) {
+ logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
+ s" (size: ${Utils.bytesToString(originalDiskSize)})")
}
}
}