aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala87
1 files changed, 42 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0614646771..9e63777caf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -217,7 +217,7 @@ private[spark] class BlockManager(
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
for ((blockId, info) <- blockInfoManager.entries) {
val status = getCurrentBlockStatus(blockId, info)
- if (!tryToReportBlockStatus(blockId, info, status)) {
+ if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
@@ -298,7 +298,7 @@ private[spark] class BlockManager(
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
- * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
+ * NOTE: This is mainly for testing.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfoManager.get(blockId).map { info =>
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
*/
private def reportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
- val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
+ val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
*/
private def tryToReportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
- if (info.tellMaster) {
- val storageLevel = status.storageLevel
- val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val onDiskSize = status.diskSize
- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
- } else {
- true
- }
+ val storageLevel = status.storageLevel
+ val inMemSize = Math.max(status.memSize, droppedMemorySize)
+ val onDiskSize = status.diskSize
+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
}
/**
@@ -374,7 +368,7 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
- BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
+ BlockStatus.empty
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
@@ -807,12 +801,10 @@ private[spark] class BlockManager(
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
@@ -961,15 +953,12 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
- // tell the master about it.
+ // Now that the block is in either the memory or disk store, tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
@@ -1271,12 +1260,10 @@ private[spark] class BlockManager(
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
- reportBlockStatus(blockId, info, status, droppedMemorySize)
+ reportBlockStatus(blockId, status, droppedMemorySize)
}
if (blockIsUpdated) {
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
- }
+ addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}
@@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
- logWarning(s"Block $blockId could not be removed as it was not found in either " +
- "the disk, memory, or external block store")
- }
- blockInfoManager.removeBlock(blockId)
- val removeBlockStatus = getCurrentBlockStatus(blockId, info)
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, info, removeBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
- }
+ removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+ }
+ }
+
+ /**
+ * Internal version of [[removeBlock()]] which assumes that the caller already holds a write
+ * lock on the block.
+ */
+ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
+ }
+ blockInfoManager.removeBlock(blockId)
+ if (tellMaster) {
+ reportBlockStatus(blockId, BlockStatus.empty)
+ }
+ }
+
+ private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}