diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-13 15:12:44 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-13 15:22:27 -0800 |
commit | 7c9e3d1c2105b694bedcfe10e554dbadd2760eb5 (patch) | |
tree | c0433b06693e475741a4cfc4d921079f6aa69e55 /core | |
parent | 1b7a0451ed7df78838ca7ea09dfa5ba0e236acfe (diff) | |
download | spark-7c9e3d1c2105b694bedcfe10e554dbadd2760eb5.tar.gz spark-7c9e3d1c2105b694bedcfe10e554dbadd2760eb5.tar.bz2 spark-7c9e3d1c2105b694bedcfe10e554dbadd2760eb5.zip |
Return success or failure in BlockStore.remove().
Diffstat (limited to 'core')
4 files changed, 23 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index b2c9e2cc40..9a60a8dd62 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -832,7 +832,10 @@ class BlockManager( diskStore.putBytes(blockId, bytes, level) } } - memoryStore.remove(blockId) + val blockWasRemoved = memoryStore.remove(blockId) + if (!blockWasRemoved) { + logWarning("Block " + blockId + " could not be dropped from memory as it does not exist") + } if (info.tellMaster) { reportBlockStatus(blockId) } @@ -856,8 +859,12 @@ class BlockManager( val info = blockInfo.get(blockId) if (info != null) info.synchronized { // Removals are idempotent in disk store and memory store. At worst, we get a warning. - memoryStore.remove(blockId) - diskStore.remove(blockId) + val removedFromMemory = memoryStore.remove(blockId) + val removedFromDisk = diskStore.remove(blockId) + if (!removedFromMemory && !removedFromDisk) { + logWarning("Block " + blockId + " could not be removed as it was not found in either " + + "the disk or memory store") + } blockInfo.remove(blockId) } else { // The block has already been removed; do nothing. diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 096bf8bdd9..8188d3595e 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -31,7 +31,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { def getValues(blockId: String): Option[Iterator[Any]] - def remove(blockId: String) + /** + * Remove a block, if it exists. + * @param blockId the block to remove. + * @return True if the block was found and removed, False otherwise. + */ + def remove(blockId: String): Boolean def contains(blockId: String): Boolean diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 8ba64e4b76..8d08871d73 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -90,10 +90,13 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes)) } - override def remove(blockId: String) { + override def remove(blockId: String): Boolean = { val file = getFile(blockId) if (file.exists()) { file.delete() + true + } else { + false } } diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 02098b82fe..00e32f753c 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -90,7 +90,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } - override def remove(blockId: String) { + override def remove(blockId: String): Boolean = { entries.synchronized { val entry = entries.get(blockId) if (entry != null) { @@ -98,8 +98,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) currentMemory -= entry.size logInfo("Block %s of size %d dropped from memory (free %d)".format( blockId, entry.size, freeMemory)) + true } else { - logWarning("Block " + blockId + " could not be removed as it does not exist") + false } } } |