aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-02 12:01:32 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-02 12:01:32 -0700
commit22684653a55d00dce448aaf1619a75879ccb5af0 (patch)
treef2a3f9e336cf72004218d8314bf8a35fd2df25ac /core/src/main
parent42e0a68082327c78dbd0fd313145124d9b8a9d98 (diff)
downloadspark-22684653a55d00dce448aaf1619a75879ccb5af0.tar.gz
spark-22684653a55d00dce448aaf1619a75879ccb5af0.tar.bz2
spark-22684653a55d00dce448aaf1619a75879ccb5af0.zip
Revert "Place Spray repo ahead of Cloudera in Maven search path"
This reverts commit 42e0a68082327c78dbd0fd313145124d9b8a9d98.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala222
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala2
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala6
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala170
4 files changed, 175 insertions, 225 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3b9cd8b5fc..37d5862575 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -59,31 +59,15 @@ class BlockLocker(numLockers: Int) {
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
- class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) {
- def waitForReady() {
- if (pending) {
- synchronized {
- while (pending) this.wait()
- }
- }
- }
-
- def markReady() {
- pending = false
- synchronized {
- this.notifyAll()
- }
- }
- }
+ case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS)
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
-
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
- private[storage] val diskStore: BlockStore =
- new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ private[storage] val diskStore: BlockStore = new DiskStore(this,
+ System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
@@ -95,6 +79,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
var cacheTracker: CacheTracker = null
val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
+
val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
initialize()
@@ -125,32 +110,45 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Tell the master about the current storage status of a block. This will send a heartbeat
- * message reflecting the current status, *not* the desired storage level in its block info.
- * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
+ * Change the storage level for a local block in the block info meta data, and
+ * tell the master if necessary. Note that this is only a meta data change and
+ * does NOT actually change the storage of the block. If the new level is
+ * invalid, then block info (if exists) will be silently removed.
*/
- def reportBlockStatus(blockId: String) {
- locker.getLock(blockId).synchronized {
- val curLevel = blockInfo.get(blockId) match {
- case null =>
- StorageLevel.NONE
- case info =>
- info.level match {
- case null =>
- StorageLevel.NONE
- case level =>
- val inMem = level.useMemory && memoryStore.contains(blockId)
- val onDisk = level.useDisk && diskStore.contains(blockId)
- new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
- }
- }
+ private[spark] def setLevelAndTellMaster(
+ blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
+
+ if (level == null) {
+ throw new IllegalArgumentException("Storage level is null")
+ }
+
+ // If there was earlier info about the block, then use earlier tellMaster
+ val oldInfo = blockInfo.get(blockId)
+ val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
+ if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
+ logWarning("Ignoring tellMaster setting as it is different from earlier setting")
+ }
+
+ // If level is valid, store the block info, else remove the block info
+ if (level.isValid) {
+ blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
+ logDebug("Info for block " + blockId + " updated with new level as " + level)
+ } else {
+ blockInfo.remove(blockId)
+ logDebug("Info for block " + blockId + " removed as new level is null or invalid")
+ }
+
+ // Tell master if necessary
+ if (newTellMaster) {
master.mustHeartBeat(HeartBeat(
blockManagerId,
blockId,
- curLevel,
- if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
- if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
+ level,
+ if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
+ if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
logDebug("Told master about block " + blockId)
+ } else {
+ logDebug("Did not tell master about block " + blockId)
}
}
@@ -182,21 +180,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId)
locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
- info.waitForReady() // In case the block is still being put() by another thread
- val level = info.level
- logDebug("Level for block " + blockId + " is " + level)
+ // Check storage level of block
+ val level = getLevel(blockId)
+ if (level != null) {
+ logDebug("Level for block " + blockId + " is " + level + " on local machine")
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getValues(blockId) match {
- case Some(iterator) =>
+ case Some(iterator) => {
logDebug("Block " + blockId + " found in memory")
return Some(iterator)
- case None =>
+ }
+ case None => {
logDebug("Block " + blockId + " not found in memory")
+ }
}
}
@@ -204,12 +203,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level.useDisk) {
logDebug("Getting block " + blockId + " from disk")
diskStore.getValues(blockId) match {
- case Some(iterator) =>
+ case Some(iterator) => {
logDebug("Block " + blockId + " found in disk")
return Some(iterator)
- case None =>
+ }
+ case None => {
throw new Exception("Block " + blockId + " not found on disk, though it should be")
return None
+ }
}
}
} else {
@@ -225,21 +226,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
logDebug("Getting local block " + blockId + " as bytes")
locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- if (info != null) {
- info.waitForReady() // In case the block is still being put() by another thread
- val level = info.level
+ // Check storage level of block
+ val level = getLevel(blockId)
+ if (level != null) {
logDebug("Level for block " + blockId + " is " + level + " on local machine")
// Look for the block in memory
if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory")
memoryStore.getBytes(blockId) match {
- case Some(bytes) =>
+ case Some(bytes) => {
logDebug("Block " + blockId + " found in memory")
return Some(bytes)
- case None =>
+ }
+ case None => {
logDebug("Block " + blockId + " not found in memory")
+ }
}
}
@@ -247,12 +249,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (level.useDisk) {
logDebug("Getting block " + blockId + " from disk")
diskStore.getBytes(blockId) match {
- case Some(bytes) =>
+ case Some(bytes) => {
logDebug("Block " + blockId + " found in disk")
return Some(bytes)
- case None =>
+ }
+ case None => {
throw new Exception("Block " + blockId + " not found on disk, though it should be")
return None
+ }
}
}
} else {
@@ -427,17 +431,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- if (blockInfo.containsKey(blockId)) {
- logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return
- }
-
- // Remember the block's storage level so that we can correctly drop it to disk if it needs
- // to be dropped right after it got put into memory. Note, however, that other threads will
- // not be able to get() this block until we call markReady on its BlockInfo.
- val myInfo = new BlockInfo(level, tellMaster)
- blockInfo.put(blockId, myInfo)
-
val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null
@@ -451,15 +444,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- if (level.useMemory) {
- // Save it just to memory first, even if it also has useDisk set to true; we will later
- // drop it to disk if the memory store can't hold it.
+ // Check and warn if block with same id already exists
+ if (getLevel(blockId) != null) {
+ logWarning("Block " + blockId + " already exists in local machine")
+ return
+ }
+
+ if (level.useMemory && level.useDisk) {
+ // If saving to both memory and disk, then serialize only once
+ memoryStore.putValues(blockId, values, level, true) match {
+ case Left(newValues) =>
+ diskStore.putValues(blockId, newValues, level, true) match {
+ case Right(newBytes) => bytes = newBytes
+ case _ => throw new Exception("Unexpected return value")
+ }
+ case Right(newBytes) =>
+ bytes = newBytes
+ diskStore.putBytes(blockId, newBytes, level)
+ }
+ } else if (level.useMemory) {
+ // If only save to memory
memoryStore.putValues(blockId, values, level, true) match {
case Right(newBytes) => bytes = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
- // Save directly to disk.
+ // If only save to disk
val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
diskStore.putValues(blockId, values, level, askForBytes) match {
case Right(newBytes) => bytes = newBytes
@@ -467,12 +477,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
- myInfo.markReady()
- if (tellMaster) {
- reportBlockStatus(blockId)
- }
+ // Store the storage level
+ setLevelAndTellMaster(blockId, level, tellMaster)
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -515,17 +521,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid")
}
- if (blockInfo.containsKey(blockId)) {
- logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- return
- }
-
- // Remember the block's storage level so that we can correctly drop it to disk if it needs
- // to be dropped right after it got put into memory. Note, however, that other threads will
- // not be able to get() this block until we call markReady on its BlockInfo.
- val myInfo = new BlockInfo(level, tellMaster)
- blockInfo.put(blockId, myInfo)
-
val startTimeMs = System.currentTimeMillis
// Initiate the replication before storing it locally. This is faster as
@@ -542,22 +537,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
locker.getLock(blockId).synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
+ if (getLevel(blockId) != null) {
+ logWarning("Block " + blockId + " already exists")
+ return
+ }
if (level.useMemory) {
- // Store it only in memory at first, even if useDisk is also set to true
bytes.rewind()
memoryStore.putBytes(blockId, bytes, level)
- } else {
+ }
+ if (level.useDisk) {
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
- myInfo.markReady()
- if (tellMaster) {
- reportBlockStatus(blockId)
- }
+ // Store the storage level
+ setLevelAndTellMaster(blockId, level, tellMaster)
}
// TODO: This code will be removed when CacheTracker is gone.
@@ -631,31 +626,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
- * store reaches its limit and needs to free up space.
+ * Drop block from memory (called when memory store has reached it limit)
*/
- def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
- logInfo("Dropping block " + blockId + " from memory")
+ def dropFromMemory(blockId: String) {
locker.getLock(blockId).synchronized {
- val info = blockInfo.get(blockId)
- val level = info.level
- if (level.useDisk && !diskStore.contains(blockId)) {
- logInfo("Writing block " + blockId + " to disk")
- data match {
- case Left(iterator) =>
- diskStore.putValues(blockId, iterator, level, false)
- case Right(bytes) =>
- diskStore.putBytes(blockId, bytes, level)
- }
+ val level = getLevel(blockId)
+ if (level == null) {
+ logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
+ return
}
- memoryStore.remove(blockId)
- if (info.tellMaster) {
- reportBlockStatus(blockId)
- }
- if (!level.useDisk) {
- // The block is completely gone from this node; forget it so we can put() it again later.
- blockInfo.remove(blockId)
+ if (!level.useMemory) {
+ logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory")
+ return
}
+ memoryStore.remove(blockId)
+ val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
+ setLevelAndTellMaster(blockId, newLevel)
}
}
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index ff482ff66b..5f123aca78 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -31,7 +31,5 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def remove(blockId: String)
- def contains(blockId: String): Boolean
-
def clear() { }
}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index d0c592ccb1..d9965f4306 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
addShutdownHook()
override def getSize(blockId: String): Long = {
- getFile(blockId).length()
+ getFile(blockId).length
}
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
@@ -93,10 +93,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
}
- override def contains(blockId: String): Boolean = {
- getFile(blockId).exists()
- }
-
private def createFile(blockId: String): File = {
val file = getFile(blockId)
if (file.exists()) {
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index 74ef326038..ea6f3c4fcc 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -18,12 +18,29 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
+ //private val blockDropper = Executors.newSingleThreadExecutor()
+ private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
+ private val blockDropper = new Thread("memory store - block dropper") {
+ override def run() {
+ try {
+ while (true) {
+ val blockId = blocksToDrop.take()
+ logDebug("Block " + blockId + " ready to be dropped")
+ blockManager.dropFromMemory(blockId)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Shutting down block dropper")
+ }
+ }
+ }
+ blockDropper.start()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
- synchronized {
+ entries.synchronized {
entries.get(blockId).size
}
}
@@ -35,12 +52,19 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- tryToPut(blockId, elements, sizeEstimate, true)
+ ensureFreeSpace(sizeEstimate)
+ val entry = new Entry(elements, sizeEstimate, true)
+ entries.synchronized { entries.put(blockId, entry) }
+ currentMemory += sizeEstimate
+ logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
} else {
val entry = new Entry(bytes, bytes.limit, false)
- ensureFreeSpace(blockId, bytes.limit)
- synchronized { entries.put(blockId, entry) }
- tryToPut(blockId, bytes, bytes.limit, false)
+ ensureFreeSpace(bytes.limit)
+ entries.synchronized { entries.put(blockId, entry) }
+ currentMemory += bytes.limit
+ logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
}
}
@@ -55,17 +79,27 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any]
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- tryToPut(blockId, elements, sizeEstimate, true)
+ ensureFreeSpace(sizeEstimate)
+ val entry = new Entry(elements, sizeEstimate, true)
+ entries.synchronized { entries.put(blockId, entry) }
+ currentMemory += sizeEstimate
+ logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
Left(elements.iterator)
} else {
val bytes = blockManager.dataSerialize(values)
- tryToPut(blockId, bytes, bytes.limit, false)
+ ensureFreeSpace(bytes.limit)
+ val entry = new Entry(bytes, bytes.limit, false)
+ entries.synchronized { entries.put(blockId, entry) }
+ currentMemory += bytes.limit
+ logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
+ blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
Right(bytes)
}
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -78,7 +112,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
- val entry = synchronized {
+ val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@@ -92,7 +126,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def remove(blockId: String) {
- synchronized {
+ entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
entries.remove(blockId)
@@ -100,118 +134,54 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
} else {
- logWarning("Block " + blockId + " could not be removed as it does not exist")
+ logWarning("Block " + blockId + " could not be removed as it doesnt exist")
}
}
}
override def clear() {
- synchronized {
+ entries.synchronized {
entries.clear()
}
+ blockDropper.interrupt()
logInfo("MemoryStore cleared")
}
- /**
- * Return the RDD ID that a given block ID is from, or null if it is not an RDD block.
- */
- private def getRddId(blockId: String): String = {
- if (blockId.startsWith("rdd_")) {
- blockId.split('_')(1)
- } else {
- null
- }
- }
-
- /**
- * Try to put in a set of values, if we can free up enough space. The value should either be
- * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
- * size must also be passed by the caller.
- */
- private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
- synchronized {
- if (ensureFreeSpace(blockId, size)) {
- val entry = new Entry(value, size, deserialized)
- entries.put(blockId, entry)
- currentMemory += size
- if (deserialized) {
- logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
- } else {
- logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
- }
- true
- } else {
- // Tell the block manager that we couldn't put it in memory so that it can drop it to
- // disk if the block allows disk storage.
- val data = if (deserialized) {
- Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
- } else {
- Right(value.asInstanceOf[ByteBuffer].duplicate())
- }
- blockManager.dropFromMemory(blockId, data)
- false
- }
- }
- }
-
- /**
- * Tries to free up a given amount of space to store a particular block, but can fail and return
- * false if either the block is bigger than our memory or it would require replacing another
- * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
- * don't fit into memory that we want to avoid).
- *
- * Assumes that a lock on entries is held by the caller.
- */
- private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
+ // TODO: This should be able to return false if the space is larger than our total memory,
+ // or if adding this block would require evicting another one from the same RDD
+ private def ensureFreeSpace(space: Long) {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
- if (space > maxMemory) {
- logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
- return false
- }
-
if (maxMemory - currentMemory < space) {
- val rddToAdd = getRddId(blockIdToAdd)
+
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
- val iterator = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
- val pair = iterator.next()
- val blockId = pair.getKey
- if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
- logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
- "block from the same RDD")
- return false
+ entries.synchronized {
+ val iter = entries.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
+ val pair = iter.next()
+ val blockId = pair.getKey
+ val entry = pair.getValue
+ if (!entry.dropPending) {
+ selectedBlocks += blockId
+ entry.dropPending = true
+ }
+ selectedMemory += pair.getValue.size
+ logInfo("Block " + blockId + " selected for dropping")
}
- selectedBlocks += blockId
- selectedMemory += pair.getValue.size
}
- if (maxMemory - (currentMemory - selectedMemory) >= space) {
- logInfo(selectedBlocks.size + " blocks selected for dropping")
- for (blockId <- selectedBlocks) {
- val entry = entries.get(blockId)
- val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
- } else {
- Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
- }
- blockManager.dropFromMemory(blockId, data)
- }
- return true
- } else {
- return false
+ logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
+ blocksToDrop.size + " blocks pending")
+ var i = 0
+ while (i < selectedBlocks.size) {
+ blocksToDrop.add(selectedBlocks(i))
+ i += 1
}
+ selectedBlocks.clear()
}
- return true
- }
-
- override def contains(blockId: String): Boolean = {
- synchronized { entries.containsKey(blockId) }
}
}