aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-28 23:18:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-28 23:18:24 -0800
commitc9789751bfc496d24e8369a0035d57f0ed8dcb58 (patch)
tree0bd155607dd06a11e55d06b5adb131b7be0ef64b /core
parent9e9e9e1d898387a1996e4c57128bafadb5938a9b (diff)
downloadspark-c9789751bfc496d24e8369a0035d57f0ed8dcb58.tar.gz
spark-c9789751bfc496d24e8369a0035d57f0ed8dcb58.tar.bz2
spark-c9789751bfc496d24e8369a0035d57f0ed8dcb58.zip
Added metadata cleaner to BlockManager to remove old blocks completely.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala47
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala1
2 files changed, 36 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index e4aa9247a3..1e36578e1a 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -10,12 +10,12 @@ import java.nio.{MappedByteBuffer, ByteBuffer}
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
-import scala.collection.JavaConversions._
import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
-import spark.util.ByteBufferInputStream
+import spark.util.{MetadataCleaner, TimeStampedHashMap, ByteBufferInputStream}
+
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import sun.nio.ch.DirectBuffer
@@ -51,7 +51,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
+ private val blockInfo = new TimeStampedHashMap[String, BlockInfo]()
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -80,6 +80,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
/**
@@ -102,8 +103,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Get storage level of local block. If no info exists for the block, then returns null.
*/
def getLevel(blockId: String): StorageLevel = {
- val info = blockInfo.get(blockId)
- if (info != null) info.level else null
+ blockInfo.get(blockId).map(_.level).orNull
}
/**
@@ -113,9 +113,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def reportBlockStatus(blockId: String) {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
- case null =>
+ case None =>
(StorageLevel.NONE, 0L, 0L)
- case info =>
+ case Some(info) =>
info.synchronized {
info.level match {
case null =>
@@ -173,7 +173,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -258,7 +258,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -517,7 +517,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- val oldBlock = blockInfo.get(blockId)
+ val oldBlock = blockInfo.get(blockId).orNull
if (oldBlock != null) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
oldBlock.waitForReady()
@@ -618,7 +618,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- if (blockInfo.containsKey(blockId)) {
+ if (blockInfo.contains(blockId)) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
return
}
@@ -740,7 +740,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
val level = info.level
@@ -767,6 +767,29 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
+ def dropOldBlocks(cleanupTime: Long) {
+ logInfo("Dropping blocks older than " + cleanupTime)
+ val iterator = blockInfo.internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
+ if (time < cleanupTime) {
+ info.synchronized {
+ val level = info.level
+ if (level.useMemory) {
+ memoryStore.remove(id)
+ }
+ if (level.useDisk) {
+ diskStore.remove(id)
+ }
+ iterator.remove()
+ logInfo("Dropped block " + id)
+ }
+ reportBlockStatus(id)
+ }
+ }
+ }
+
def shouldCompress(blockId: String): Boolean = {
if (blockId.startsWith("shuffle_")) {
compressShuffle
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 397395a65b..af15663621 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -341,6 +341,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
throw new Exception("Self index for " + blockManagerId + " not found")
}
+ // Note that this logic will select the same node multiple times if there aren't enough peers
var index = selfIndex
while (res.size < size) {
index += 1