aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-14 00:27:24 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-14 00:27:24 -0800
commit06f855c24d5e4cb77a69671a3b0e2afba7d4e1c0 (patch)
tree7a3fd91ca6799a8bd400cca7824000ae0f752e29 /core
parent8c01295b859c35f4034528d4487a45c34728d0fb (diff)
parent7c9e3d1c2105b694bedcfe10e554dbadd2760eb5 (diff)
downloadspark-06f855c24d5e4cb77a69671a3b0e2afba7d4e1c0.tar.gz
spark-06f855c24d5e4cb77a69671a3b0e2afba7d4e1c0.tar.bz2
spark-06f855c24d5e4cb77a69671a3b0e2afba7d4e1c0.zip
Merge branch 'spark-633' of github.com:rxin/spark into spark-633
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala13
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala7
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala5
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala5
4 files changed, 23 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 2f41633440..eedf6d96e2 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -829,7 +829,10 @@ class BlockManager(
diskStore.putBytes(blockId, bytes, level)
}
}
- memoryStore.remove(blockId)
+ val blockWasRemoved = memoryStore.remove(blockId)
+ if (!blockWasRemoved) {
+ logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
+ }
if (info.tellMaster) {
reportBlockStatus(blockId)
}
@@ -853,8 +856,12 @@ class BlockManager(
val info = blockInfo.get(blockId).orNull
if (info != null) info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
- memoryStore.remove(blockId)
- diskStore.remove(blockId)
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning("Block " + blockId + " could not be removed as it was not found in either " +
+ "the disk or memory store")
+ }
blockInfo.remove(blockId)
} else {
// The block has already been removed; do nothing.
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 096bf8bdd9..8188d3595e 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,7 +31,12 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def getValues(blockId: String): Option[Iterator[Any]]
- def remove(blockId: String)
+ /**
+ * Remove a block, if it exists.
+ * @param blockId the block to remove.
+ * @return True if the block was found and removed, False otherwise.
+ */
+ def remove(blockId: String): Boolean
def contains(blockId: String): Boolean
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index b5561479db..7e5b820cbb 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -92,10 +92,13 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes))
}
- override def remove(blockId: String) {
+ override def remove(blockId: String): Boolean = {
val file = getFile(blockId)
if (file.exists()) {
file.delete()
+ true
+ } else {
+ false
}
}
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 02098b82fe..00e32f753c 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -90,7 +90,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def remove(blockId: String) {
+ override def remove(blockId: String): Boolean = {
entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
@@ -98,8 +98,9 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
+ true
} else {
- logWarning("Block " + blockId + " could not be removed as it does not exist")
+ false
}
}
}