aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-09-12 13:09:33 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-09-12 13:09:33 -0700
commit3d40896f410590c0be044b3fa7e5d32115fac05e (patch)
tree726ea096c4af3311b7333647cabe04b52c91f975 /core/src
parent1742c3ab86d75ce3d352f7cddff65e62fb7c8dd4 (diff)
downloadspark-3d40896f410590c0be044b3fa7e5d32115fac05e.tar.gz
spark-3d40896f410590c0be044b3fa7e5d32115fac05e.tar.bz2
spark-3d40896f410590c0be044b3fa7e5d32115fac05e.zip
[SPARK-17483] Refactoring in BlockManager status reporting and block removal
This patch makes three minor refactorings to the BlockManager: - Move the `if (info.tellMaster)` check out of `reportBlockStatus`; this fixes an issue where a debug logging message would incorrectly claim to have reported a block status to the master even though no message had been sent (in case `info.tellMaster == false`). This also makes it easier to write code which unconditionally sends block statuses to the master (which is necessary in another patch of mine). - Split `removeBlock()` into two methods, the existing method and an internal `removeBlockInternal()` method which is designed to be called by internal code that already holds a write lock on the block. This is also needed by a followup patch. - Instead of calling `getCurrentBlockStatus()` in `removeBlock()`, just pass `BlockStatus.empty`; the block status should always be empty following complete removal of a block. These changes were originally authored as part of a bug fix patch which is targeted at branch-2.0 and master; I've split them out here into their own separate PR in order to make them easier to review and so that the behavior-changing parts of my other patch can be isolated to their own PR. Author: Josh Rosen <joshrosen@databricks.com> Closes #15036 from JoshRosen/cache-failure-race-conditions-refactorings-only.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala87
1 files changed, 42 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0614646771..9e63777caf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -217,7 +217,7 @@ private[spark] class BlockManager(
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
for ((blockId, info) <- blockInfoManager.entries) {
val status = getCurrentBlockStatus(blockId, info)
- if (!tryToReportBlockStatus(blockId, info, status)) {
+ if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
@@ -298,7 +298,7 @@ private[spark] class BlockManager(
/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
- * NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
+ * NOTE: This is mainly for testing.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfoManager.get(blockId).map { info =>
@@ -333,10 +333,9 @@ private[spark] class BlockManager(
*/
private def reportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
- val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
+ val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
@@ -352,17 +351,12 @@ private[spark] class BlockManager(
*/
private def tryToReportBlockStatus(
blockId: BlockId,
- info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
- if (info.tellMaster) {
- val storageLevel = status.storageLevel
- val inMemSize = Math.max(status.memSize, droppedMemorySize)
- val onDiskSize = status.diskSize
- master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
- } else {
- true
- }
+ val storageLevel = status.storageLevel
+ val inMemSize = Math.max(status.memSize, droppedMemorySize)
+ val onDiskSize = status.diskSize
+ master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
}
/**
@@ -374,7 +368,7 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
- BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
+ BlockStatus.empty
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
@@ -807,12 +801,10 @@ private[spark] class BlockManager(
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
@@ -961,15 +953,12 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory, externalBlockStore, or disk store,
- // tell the master about it.
+ // Now that the block is in either the memory or disk store, tell the master about it.
info.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, info, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
}
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
@@ -1271,12 +1260,10 @@ private[spark] class BlockManager(
val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
- reportBlockStatus(blockId, info, status, droppedMemorySize)
+ reportBlockStatus(blockId, status, droppedMemorySize)
}
if (blockIsUpdated) {
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
- }
+ addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}
@@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
- logWarning(s"Block $blockId could not be removed as it was not found in either " +
- "the disk, memory, or external block store")
- }
- blockInfoManager.removeBlock(blockId)
- val removeBlockStatus = getCurrentBlockStatus(blockId, info)
- if (tellMaster && info.tellMaster) {
- reportBlockStatus(blockId, info, removeBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
- }
+ removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
+ addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
+ }
+ }
+
+ /**
+ * Internal version of [[removeBlock()]] which assumes that the caller already holds a write
+ * lock on the block.
+ */
+ private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
+ }
+ blockInfoManager.removeBlock(blockId)
+ if (tellMaster) {
+ reportBlockStatus(blockId, BlockStatus.empty)
+ }
+ }
+
+ private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}