aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-07-27 10:02:26 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2012-07-27 10:02:26 +0000
commit435d129bec024512e14b95592f1f3b6e58322350 (patch)
treef61f5d94bd7a587687df88905c45f3cbad48986d
parent0426769f89d4017ecb61c7528eb0f66cdc9c05fc (diff)
downloadspark-435d129bec024512e14b95592f1f3b6e58322350.tar.gz
spark-435d129bec024512e14b95592f1f3b6e58322350.tar.bz2
spark-435d129bec024512e14b95592f1f3b6e58322350.zip
Fixed bugs in block dropping code of MemoryStore and changed synchronized HashMap to ConcurrentHashMap in BlockManager.
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala18
2 files changed, 8 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 9e4816f7ce..c5da6dd7d6 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -5,7 +5,7 @@ import java.nio._
import java.nio.channels.FileChannel.MapMode
import java.util.{HashMap => JHashMap}
import java.util.LinkedHashMap
-import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
import java.util.Collections
import scala.actors._
@@ -74,7 +74,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS)
- private val blockInfo = Collections.synchronizedMap(new JHashMap[String, BlockInfo])
+ private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
private val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private val diskStore: BlockStore = new DiskStore(this,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index ce6faced34..17f4f51aa8 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -40,7 +40,7 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
- case class Entry(var value: Any, size: Long, deserialized: Boolean, dropPending: Boolean = false)
+ case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
@@ -52,6 +52,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
try{
while (true) {
val blockId = blocksToDrop.take()
+ logDebug("Block " + blockId + " ready to be dropped")
blockManager.dropFromMemory(blockId)
}
} catch {
@@ -143,15 +144,6 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("MemoryStore cleared")
}
- private def drop(blockId: String) {
- /*blockDropper.submit(new Runnable() {
- def run() {
- blockManager.dropFromMemory(blockId)
- }
- })
- */
- }
-
private def ensureFreeSpace(space: Long) {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
@@ -169,13 +161,15 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = pair.getValue()
if (!entry.dropPending) {
selectedBlocks += blockId
+ entry.dropPending = true
}
selectedMemory += pair.getValue.size
- logDebug("Block" + blockId + " selected for dropping")
+ logDebug("Block " + blockId + " selected for dropping")
}
}
- logDebug("" + selectedBlocks.size + " selected for dropping")
+ logDebug("" + selectedBlocks.size + " new blocks selected for dropping, " +
+ blocksToDrop.size + " blocks pending")
var i = 0
while (i < selectedBlocks.size) {
blocksToDrop.add(selectedBlocks(i))