aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoreatoncys <chen.yanshan@zte.com.cn>2017-04-22 12:29:35 +0100
committerSean Owen <sowen@cloudera.com>2017-04-22 12:29:35 +0100
commit05a451491d535c0828413ce2eb06fe94571069ac (patch)
tree8aaee18bbdc67b7928344f53c9117cd87e85b259
parentad290402aa1d609abf5a2883a6d87fa8bc2bd517 (diff)
downloadspark-05a451491d535c0828413ce2eb06fe94571069ac.tar.gz
spark-05a451491d535c0828413ce2eb06fe94571069ac.tar.bz2
spark-05a451491d535c0828413ce2eb06fe94571069ac.zip
[SPARK-20386][SPARK CORE] modify the log info if the block exists on the slave already
## What changes were proposed in this pull request? Modify the added memory size to memSize-originalMemSize if the block exists on the slave already since if the block exists, the added memory size should be memSize-originalMemSize; if originalMemSize is bigger than memSize ,then the log info should be Removed memory, removed size should be originalMemSize-memSize ## How was this patch tested? Multiple runs on existing unit tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: eatoncys <chen.yanshan@zte.com.cn> Closes #17683 from eatoncys/SPARK-20386.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala52
1 files changed, 35 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 467c3e0e6b..6f85b9e4d6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -497,11 +497,17 @@ private[spark] class BlockManagerInfo(
updateLastSeenMs()
- if (_blocks.containsKey(blockId)) {
+ val blockExists = _blocks.containsKey(blockId)
+ var originalMemSize: Long = 0
+ var originalDiskSize: Long = 0
+ var originalLevel: StorageLevel = StorageLevel.NONE
+
+ if (blockExists) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
- val originalLevel: StorageLevel = blockStatus.storageLevel
- val originalMemSize: Long = blockStatus.memSize
+ originalLevel = blockStatus.storageLevel
+ originalMemSize = blockStatus.memSize
+ originalDiskSize = blockStatus.diskSize
if (originalLevel.useMemory) {
_remainingMem += originalMemSize
@@ -520,32 +526,44 @@ private[spark] class BlockManagerInfo(
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
- logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
- Utils.bytesToString(_remainingMem)))
+ if (blockExists) {
+ logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
+ s" (current size: ${Utils.bytesToString(memSize)}," +
+ s" original size: ${Utils.bytesToString(originalMemSize)}," +
+ s" free: ${Utils.bytesToString(_remainingMem)})")
+ } else {
+ logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
+ s" (size: ${Utils.bytesToString(memSize)}," +
+ s" free: ${Utils.bytesToString(_remainingMem)})")
+ }
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
- logInfo("Added %s on disk on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
+ if (blockExists) {
+ logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
+ s" (current size: ${Utils.bytesToString(diskSize)}," +
+ s" original size: ${Utils.bytesToString(originalDiskSize)})")
+ } else {
+ logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
+ s" (size: ${Utils.bytesToString(diskSize)})")
+ }
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
- } else if (_blocks.containsKey(blockId)) {
+ } else if (blockExists) {
// If isValid is not true, drop the block.
- val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
_cachedBlocks -= blockId
- if (blockStatus.storageLevel.useMemory) {
- logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
- Utils.bytesToString(_remainingMem)))
+ if (originalLevel.useMemory) {
+ logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
+ s" (size: ${Utils.bytesToString(originalMemSize)}," +
+ s" free: ${Utils.bytesToString(_remainingMem)})")
}
- if (blockStatus.storageLevel.useDisk) {
- logInfo("Removed %s on %s on disk (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
+ if (originalLevel.useDisk) {
+ logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
+ s" (size: ${Utils.bytesToString(originalDiskSize)})")
}
}
}