aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMike <wannabeast@users.noreply.github.com>2013-09-19 23:31:35 -0700
committerMike <wannabeast@users.noreply.github.com>2013-09-19 23:31:35 -0700
commit9524b943a4f01297b9c5582d436e8af37d786d5e (patch)
tree26a58d96d4d1567a56a46d88c4b37cb6a79aa72a /core
parentd34672f6684d2c14fc5db58335370ef9ba84375e (diff)
downloadspark-9524b943a4f01297b9c5582d436e8af37d786d5e.tar.gz
spark-9524b943a4f01297b9c5582d436e8af37d786d5e.tar.bz2
spark-9524b943a4f01297b9c5582d436e8af37d786d5e.zip
Synchronize on "entries" the remaining update to "currentMemory".
Make "currentMemory" @volatile, so that it's reads in ensureFreeSpace() are atomic and up-to-date--i.e., currentMemory can't increase while putLock is held (though it could decrease, which would only help ensureFreeSpace()).
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala8
1 files changed, 5 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 4344f851d9..77a39c71ed 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -33,7 +33,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
case class Entry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
- private var currentMemory = 0L
+ @volatile private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
@@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += size
+ entries.synchronized {
+ entries.put(blockId, entry)
+ currentMemory += size
+ }
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))