aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-09-26 11:27:34 -0700
committerReynold Xin <reynoldx@gmail.com>2013-09-26 11:27:34 -0700
commit560ee5c9bba3f9fde380c831d0c6701343b2fecf (patch)
treeb4e84c33ddd96e10475eacad8ef3f99ad186fcf7
parent6566a19b38204d754c5e8f821b4276616e90abc6 (diff)
parent9524b943a4f01297b9c5582d436e8af37d786d5e (diff)
downloadspark-560ee5c9bba3f9fde380c831d0c6701343b2fecf.tar.gz
spark-560ee5c9bba3f9fde380c831d0c6701343b2fecf.tar.bz2
spark-560ee5c9bba3f9fde380c831d0c6701343b2fecf.zip
Merge pull request #7 from wannabeast/memorystore-fixes
some minor fixes to MemoryStore This is a repeat of #5, moved to its own branch in my repo. This makes all updates to on ; it skips on synchronizing the reads where it can get away with it.
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala14
1 files changed, 8 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 3b3b2342fa..77a39c71ed 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils}
private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
- case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
+ case class Entry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
- private var currentMemory = 0L
+ @volatile private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
@@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def remove(blockId: String): Boolean = {
entries.synchronized {
- val entry = entries.get(blockId)
+ val entry = entries.remove(blockId)
if (entry != null) {
- entries.remove(blockId)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
@@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def clear() {
entries.synchronized {
entries.clear()
+ currentMemory = 0
}
logInfo("MemoryStore cleared")
}
@@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += size
+ entries.synchronized {
+ entries.put(blockId, entry)
+ currentMemory += size
+ }
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))