From 6098f7e87a88d0b847c402b95510cb07352db643 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 2 Oct 2012 17:25:38 -0700 Subject: Fixed cache replacement behavior of BlockManager: - Partitions that get dropped to disk will now be loaded back into RAM after they're accessed again - Same-RDD rule for cache replacement is now implemented (don't drop partitions from an RDD to make room for other partitions from itself) - Items stored as MEMORY_AND_DISK go into memory only first, instead of being eagerly written out to disk - MemoryStore.ensureFreeSpace is called within a lock on the writer thread to prevent race conditions (this can still be optimized to allow multiple concurrent calls to it but it's a start) - MemoryStore does not accept blocks larger than its limit --- .../main/scala/spark/storage/BlockManager.scala | 290 ++++++++++++--------- core/src/main/scala/spark/storage/BlockStore.scala | 2 + core/src/main/scala/spark/storage/DiskStore.scala | 6 +- .../src/main/scala/spark/storage/MemoryStore.scala | 170 +++++++----- .../scala/spark/storage/BlockManagerSuite.scala | 89 ++++++- 5 files changed, 361 insertions(+), 196 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 37d5862575..52f5c826d9 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -59,15 +59,31 @@ class BlockLocker(numLockers: Int) { class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) extends Logging { - case class BlockInfo(level: StorageLevel, tellMaster: Boolean) + class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) { + def waitForReady() { + if (pending) { + synchronized { + while (pending) this.wait() + } + } + } + + def markReady() { + pending = false + synchronized { + this.notifyAll() + } + } + } private val NUM_LOCKS = 337 private val locker = new BlockLocker(NUM_LOCKS) private val blockInfo = new ConcurrentHashMap[String, BlockInfo]() + private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore: BlockStore = new DiskStore(this, - System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) + private[storage] val diskStore: BlockStore = + new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) val connectionManager = new ConnectionManager(0) implicit val futureExecContext = connectionManager.futureExecContext @@ -79,7 +95,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m var cacheTracker: CacheTracker = null val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties - val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean initialize() @@ -110,45 +125,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Change the storage level for a local block in the block info meta data, and - * tell the master if necessary. Note that this is only a meta data change and - * does NOT actually change the storage of the block. If the new level is - * invalid, then block info (if exists) will be silently removed. + * Tell the master about the current storage status of a block. This will send a heartbeat + * message reflecting the current status, *not* the desired storage level in its block info. + * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. */ - private[spark] def setLevelAndTellMaster( - blockId: String, level: StorageLevel, tellMaster: Boolean = true) { - - if (level == null) { - throw new IllegalArgumentException("Storage level is null") - } - - // If there was earlier info about the block, then use earlier tellMaster - val oldInfo = blockInfo.get(blockId) - val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster - if (oldInfo != null && oldInfo.tellMaster != tellMaster) { - logWarning("Ignoring tellMaster setting as it is different from earlier setting") - } - - // If level is valid, store the block info, else remove the block info - if (level.isValid) { - blockInfo.put(blockId, new BlockInfo(level, newTellMaster)) - logDebug("Info for block " + blockId + " updated with new level as " + level) - } else { - blockInfo.remove(blockId) - logDebug("Info for block " + blockId + " removed as new level is null or invalid") - } - - // Tell master if necessary - if (newTellMaster) { + def reportBlockStatus(blockId: String) { + locker.getLock(blockId).synchronized { + val curLevel = blockInfo.get(blockId) match { + case null => + StorageLevel.NONE + case info => + info.level match { + case null => + StorageLevel.NONE + case level => + val inMem = level.useMemory && memoryStore.contains(blockId) + val onDisk = level.useDisk && diskStore.contains(blockId) + new StorageLevel(onDisk, inMem, level.deserialized, level.replication) + } + } master.mustHeartBeat(HeartBeat( blockManagerId, blockId, - level, - if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0, - if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0)) + curLevel, + if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L, + if (curLevel.useDisk) diskStore.getSize(blockId) else 0L)) logDebug("Told master about block " + blockId) - } else { - logDebug("Did not tell master about block " + blockId) } } @@ -180,36 +182,59 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) locker.getLock(blockId).synchronized { - // Check storage level of block - val level = getLevel(blockId) - if (level != null) { - logDebug("Level for block " + blockId + " is " + level + " on local machine") + val info = blockInfo.get(blockId) + if (info != null) { + info.waitForReady() // In case the block is still being put() by another thread + val level = info.level + logDebug("Level for block " + blockId + " is " + level) // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") memoryStore.getValues(blockId) match { - case Some(iterator) => { - logDebug("Block " + blockId + " found in memory") + case Some(iterator) => return Some(iterator) - } - case None => { + case None => logDebug("Block " + blockId + " not found in memory") - } } } - // Look for block on disk + // Look for block on disk, potentially loading it back into memory if required if (level.useDisk) { logDebug("Getting block " + blockId + " from disk") - diskStore.getValues(blockId) match { - case Some(iterator) => { - logDebug("Block " + blockId + " found in disk") - return Some(iterator) + if (level.useMemory && level.deserialized) { + diskStore.getValues(blockId) match { + case Some(iterator) => + // Put the block back in memory before returning it + memoryStore.putValues(blockId, iterator, level, true) match { + case Left(iterator2) => + return Some(iterator2) + case _ => + throw new Exception("Memory store did not return back an iterator") + } + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") } - case None => { - throw new Exception("Block " + blockId + " not found on disk, though it should be") - return None + } else if (level.useMemory && !level.deserialized) { + // Read it as a byte buffer into memory first, then return it + diskStore.getBytes(blockId) match { + case Some(bytes) => + // Put a copy of the block back in memory before returning it. Note that we can't + // put the ByteBuffer returned by the disk store as that's a memory-mapped file. + val copyForMemory = ByteBuffer.allocate(bytes.limit) + copyForMemory.put(bytes) + memoryStore.putBytes(blockId, copyForMemory, level) + bytes.rewind() + return Some(dataDeserialize(bytes)) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") + } + } else { + diskStore.getValues(blockId) match { + case Some(iterator) => + return Some(iterator) + case None => + throw new Exception("Block " + blockId + " not found on disk, though it should be") } } } @@ -224,39 +249,46 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Get block from the local block manager as serialized bytes. */ def getLocalBytes(blockId: String): Option[ByteBuffer] = { + // TODO: This whole thing is very similar to getLocal; we need to refactor it somehow logDebug("Getting local block " + blockId + " as bytes") locker.getLock(blockId).synchronized { - // Check storage level of block - val level = getLevel(blockId) - if (level != null) { - logDebug("Level for block " + blockId + " is " + level + " on local machine") + val info = blockInfo.get(blockId) + if (info != null) { + info.waitForReady() // In case the block is still being put() by another thread + val level = info.level + logDebug("Level for block " + blockId + " is " + level) // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") memoryStore.getBytes(blockId) match { - case Some(bytes) => { - logDebug("Block " + blockId + " found in memory") + case Some(bytes) => return Some(bytes) - } - case None => { + case None => logDebug("Block " + blockId + " not found in memory") - } } } // Look for block on disk if (level.useDisk) { - logDebug("Getting block " + blockId + " from disk") + // Read it as a byte buffer into memory first, then return it diskStore.getBytes(blockId) match { - case Some(bytes) => { - logDebug("Block " + blockId + " found in disk") + case Some(bytes) => + if (level.useMemory) { + if (level.deserialized) { + memoryStore.putBytes(blockId, bytes, level) + } else { + // The memory store will hang onto the ByteBuffer, so give it a copy instead of + // the memory-mapped file buffer we got from the disk store + val copyForMemory = ByteBuffer.allocate(bytes.limit) + copyForMemory.put(bytes) + memoryStore.putBytes(blockId, copyForMemory, level) + } + } + bytes.rewind() return Some(bytes) - } - case None => { + case None => throw new Exception("Block " + blockId + " not found on disk, though it should be") - return None - } } } } else { @@ -431,6 +463,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } + if (blockInfo.containsKey(blockId)) { + logWarning("Block " + blockId + " already exists on this machine; not re-adding it") + return + } + + // Remember the block's storage level so that we can correctly drop it to disk if it needs + // to be dropped right after it got put into memory. Note, however, that other threads will + // not be able to get() this block until we call markReady on its BlockInfo. + val myInfo = new BlockInfo(level, tellMaster) + blockInfo.put(blockId, myInfo) + val startTimeMs = System.currentTimeMillis var bytes: ByteBuffer = null @@ -444,32 +487,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") - // Check and warn if block with same id already exists - if (getLevel(blockId) != null) { - logWarning("Block " + blockId + " already exists in local machine") - return - } - - if (level.useMemory && level.useDisk) { - // If saving to both memory and disk, then serialize only once - memoryStore.putValues(blockId, values, level, true) match { - case Left(newValues) => - diskStore.putValues(blockId, newValues, level, true) match { - case Right(newBytes) => bytes = newBytes - case _ => throw new Exception("Unexpected return value") - } - case Right(newBytes) => - bytes = newBytes - diskStore.putBytes(blockId, newBytes, level) - } - } else if (level.useMemory) { - // If only save to memory + if (level.useMemory) { + // Save it just to memory first, even if it also has useDisk set to true; we will later + // drop it to disk if the memory store can't hold it. memoryStore.putValues(blockId, values, level, true) match { case Right(newBytes) => bytes = newBytes case Left(newIterator) => valuesAfterPut = newIterator } } else { - // If only save to disk + // Save directly to disk. val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them diskStore.putValues(blockId, values, level, askForBytes) match { case Right(newBytes) => bytes = newBytes @@ -477,8 +503,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - // Store the storage level - setLevelAndTellMaster(blockId, level, tellMaster) + // Now that the block is in either the memory or disk store, let other threads read it, + // and tell the master about it. + myInfo.markReady() + if (tellMaster) { + reportBlockStatus(blockId) + } } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) @@ -521,6 +551,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } + if (blockInfo.containsKey(blockId)) { + logWarning("Block " + blockId + " already exists on this machine; not re-adding it") + return + } + + // Remember the block's storage level so that we can correctly drop it to disk if it needs + // to be dropped right after it got put into memory. Note, however, that other threads will + // not be able to get() this block until we call markReady on its BlockInfo. + val myInfo = new BlockInfo(level, tellMaster) + blockInfo.put(blockId, myInfo) + val startTimeMs = System.currentTimeMillis // Initiate the replication before storing it locally. This is faster as @@ -537,22 +578,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m locker.getLock(blockId).synchronized { logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") - if (getLevel(blockId) != null) { - logWarning("Block " + blockId + " already exists") - return - } if (level.useMemory) { + // Store it only in memory at first, even if useDisk is also set to true bytes.rewind() memoryStore.putBytes(blockId, bytes, level) - } - if (level.useDisk) { + } else { bytes.rewind() diskStore.putBytes(blockId, bytes, level) } - // Store the storage level - setLevelAndTellMaster(blockId, level, tellMaster) + // Now that the block is in either the memory or disk store, let other threads read it, + // and tell the master about it. + myInfo.markReady() + if (tellMaster) { + reportBlockStatus(blockId) + } } // TODO: This code will be removed when CacheTracker is gone. @@ -604,11 +645,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This code will be removed when CacheTracker is gone. private def notifyTheCacheTracker(key: String) { - val rddInfo = key.split("_") - val rddId: Int = rddInfo(1).toInt - val splitIndex: Int = rddInfo(2).toInt - val host = System.getProperty("spark.hostname", Utils.localHostName()) - cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, splitIndex, host)) + if (cacheTracker != null) { + val rddInfo = key.split("_") + val rddId: Int = rddInfo(1).toInt + val partition: Int = rddInfo(2).toInt + val host = System.getProperty("spark.hostname", Utils.localHostName()) + cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, partition, host)) + } } /** @@ -626,22 +669,31 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Drop block from memory (called when memory store has reached it limit) + * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory + * store reaches its limit and needs to free up space. */ - def dropFromMemory(blockId: String) { + def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) { + logInfo("Dropping block " + blockId + " from memory") locker.getLock(blockId).synchronized { - val level = getLevel(blockId) - if (level == null) { - logWarning("Block " + blockId + " cannot be removed from memory as it does not exist") - return - } - if (!level.useMemory) { - logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory") - return + val info = blockInfo.get(blockId) + val level = info.level + if (level.useDisk && !diskStore.contains(blockId)) { + logInfo("Writing block " + blockId + " to disk") + data match { + case Left(iterator) => + diskStore.putValues(blockId, iterator, level, false) + case Right(bytes) => + diskStore.putBytes(blockId, bytes, level) + } } memoryStore.remove(blockId) - val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication) - setLevelAndTellMaster(blockId, newLevel) + if (info.tellMaster) { + reportBlockStatus(blockId) + } + if (!level.useDisk) { + // The block is completely gone from this node; forget it so we can put() it again later. + blockInfo.remove(blockId) + } } } diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 5f123aca78..ff482ff66b 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -31,5 +31,7 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { def remove(blockId: String) + def contains(blockId: String): Boolean + def clear() { } } diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index d9965f4306..d0c592ccb1 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) addShutdownHook() override def getSize(blockId: String): Long = { - getFile(blockId).length + getFile(blockId).length() } override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { @@ -93,6 +93,10 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } } + override def contains(blockId: String): Boolean = { + getFile(blockId).exists() + } + private def createFile(blockId: String): File = { val file = getFile(blockId) if (file.exists()) { diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index ea6f3c4fcc..74ef326038 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -18,29 +18,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) private var currentMemory = 0L - //private val blockDropper = Executors.newSingleThreadExecutor() - private val blocksToDrop = new ArrayBlockingQueue[String](10000, true) - private val blockDropper = new Thread("memory store - block dropper") { - override def run() { - try { - while (true) { - val blockId = blocksToDrop.take() - logDebug("Block " + blockId + " ready to be dropped") - blockManager.dropFromMemory(blockId) - } - } catch { - case ie: InterruptedException => - logInfo("Shutting down block dropper") - } - } - } - blockDropper.start() logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) def freeMemory: Long = maxMemory - currentMemory override def getSize(blockId: String): Long = { - entries.synchronized { + synchronized { entries.get(blockId).size } } @@ -52,19 +35,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - ensureFreeSpace(sizeEstimate) - val entry = new Entry(elements, sizeEstimate, true) - entries.synchronized { entries.put(blockId, entry) } - currentMemory += sizeEstimate - logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( - blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory))) + tryToPut(blockId, elements, sizeEstimate, true) } else { val entry = new Entry(bytes, bytes.limit, false) - ensureFreeSpace(bytes.limit) - entries.synchronized { entries.put(blockId, entry) } - currentMemory += bytes.limit - logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format( - blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory))) + ensureFreeSpace(blockId, bytes.limit) + synchronized { entries.put(blockId, entry) } + tryToPut(blockId, bytes, bytes.limit, false) } } @@ -79,27 +55,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - ensureFreeSpace(sizeEstimate) - val entry = new Entry(elements, sizeEstimate, true) - entries.synchronized { entries.put(blockId, entry) } - currentMemory += sizeEstimate - logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( - blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory))) + tryToPut(blockId, elements, sizeEstimate, true) Left(elements.iterator) } else { val bytes = blockManager.dataSerialize(values) - ensureFreeSpace(bytes.limit) - val entry = new Entry(bytes, bytes.limit, false) - entries.synchronized { entries.put(blockId, entry) } - currentMemory += bytes.limit - logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format( - blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory))) + tryToPut(blockId, bytes, bytes.limit, false) Right(bytes) } } override def getBytes(blockId: String): Option[ByteBuffer] = { - val entry = entries.synchronized { + val entry = synchronized { entries.get(blockId) } if (entry == null) { @@ -112,7 +78,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def getValues(blockId: String): Option[Iterator[Any]] = { - val entry = entries.synchronized { + val entry = synchronized { entries.get(blockId) } if (entry == null) { @@ -126,7 +92,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def remove(blockId: String) { - entries.synchronized { + synchronized { val entry = entries.get(blockId) if (entry != null) { entries.remove(blockId) @@ -134,54 +100,118 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("Block %s of size %d dropped from memory (free %d)".format( blockId, entry.size, freeMemory)) } else { - logWarning("Block " + blockId + " could not be removed as it doesnt exist") + logWarning("Block " + blockId + " could not be removed as it does not exist") } } } override def clear() { - entries.synchronized { + synchronized { entries.clear() } - blockDropper.interrupt() logInfo("MemoryStore cleared") } - // TODO: This should be able to return false if the space is larger than our total memory, - // or if adding this block would require evicting another one from the same RDD - private def ensureFreeSpace(space: Long) { + /** + * Return the RDD ID that a given block ID is from, or null if it is not an RDD block. + */ + private def getRddId(blockId: String): String = { + if (blockId.startsWith("rdd_")) { + blockId.split('_')(1) + } else { + null + } + } + + /** + * Try to put in a set of values, if we can free up enough space. The value should either be + * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) + * size must also be passed by the caller. + */ + private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = { + synchronized { + if (ensureFreeSpace(blockId, size)) { + val entry = new Entry(value, size, deserialized) + entries.put(blockId, entry) + currentMemory += size + if (deserialized) { + logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( + blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory))) + } else { + logInfo("Block %s stored as bytes to memory (size %s, free %s)".format( + blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory))) + } + true + } else { + // Tell the block manager that we couldn't put it in memory so that it can drop it to + // disk if the block allows disk storage. + val data = if (deserialized) { + Left(value.asInstanceOf[ArrayBuffer[Any]].iterator) + } else { + Right(value.asInstanceOf[ByteBuffer].duplicate()) + } + blockManager.dropFromMemory(blockId, data) + false + } + } + } + + /** + * Tries to free up a given amount of space to store a particular block, but can fail and return + * false if either the block is bigger than our memory or it would require replacing another + * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that + * don't fit into memory that we want to avoid). + * + * Assumes that a lock on entries is held by the caller. + */ + private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = { logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( space, currentMemory, maxMemory)) + if (space > maxMemory) { + logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit") + return false + } + if (maxMemory - currentMemory < space) { - + val rddToAdd = getRddId(blockIdToAdd) val selectedBlocks = new ArrayBuffer[String]() var selectedMemory = 0L - entries.synchronized { - val iter = entries.entrySet().iterator() - while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) { - val pair = iter.next() - val blockId = pair.getKey - val entry = pair.getValue - if (!entry.dropPending) { - selectedBlocks += blockId - entry.dropPending = true - } - selectedMemory += pair.getValue.size - logInfo("Block " + blockId + " selected for dropping") + val iterator = entries.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { + val pair = iterator.next() + val blockId = pair.getKey + if (rddToAdd != null && rddToAdd == getRddId(blockId)) { + logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + + "block from the same RDD") + return false } + selectedBlocks += blockId + selectedMemory += pair.getValue.size } - logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " + - blocksToDrop.size + " blocks pending") - var i = 0 - while (i < selectedBlocks.size) { - blocksToDrop.add(selectedBlocks(i)) - i += 1 + if (maxMemory - (currentMemory - selectedMemory) >= space) { + logInfo(selectedBlocks.size + " blocks selected for dropping") + for (blockId <- selectedBlocks) { + val entry = entries.get(blockId) + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + } + blockManager.dropFromMemory(blockId, data) + } + return true + } else { + return false } - selectedBlocks.clear() } + return true + } + + override def contains(blockId: String): Boolean = { + synchronized { entries.containsKey(blockId) } } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index d15d7285a7..f6b8b49bff 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") - // Setting storage level of a1 and a2 to invalid; they should be removed from store and master - store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1)) - store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0)) + // Drop a1 and a2 from memory; this should be reported back to the master + store.dropFromMemory("a1", null) + store.dropFromMemory("a2", null) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") @@ -113,13 +113,58 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a1") === None, "a1 was in store") assert(store.getSingle("a2") != None, "a2 was not in store") // At this point a2 was gotten last, so LRU will getSingle rid of a3 - store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER) Thread.sleep(100) assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") - assert(store.getSingle("a3") === None, "a1 was in store") + assert(store.getSingle("a3") === None, "a3 was in store") } - + + test("in-memory LRU for partitions of same RDD") { + val store = new BlockManager(master, new KryoSerializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY) + Thread.sleep(100) + // Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2 + // from the same RDD + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store") + // Check that rdd_0_3 doesn't replace them even after further accesses + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store") + } + + test("in-memory LRU for partitions of multiple RDDs") { + val store = new BlockManager(master, new KryoSerializer, 1200) + store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + Thread.sleep(100) + // At this point rdd_1_1 should've replaced rdd_0_1 + assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + // Do a get() on rdd_0_2 so that it is the most recently used item + assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store") + // Put in more partitions from RDD 0; they should replace rdd_1_1 + store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY) + Thread.sleep(100) + // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped + // when we try to add rdd_0_4. + assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store") + assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store") + assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store") + assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store") + assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store") + } + test("on-disk storage") { val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) @@ -149,6 +194,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") } + test("disk and memory storage with getLocalBytes") { + val store = new BlockManager(master, new KryoSerializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK) + Thread.sleep(100) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + test("disk and memory storage with serialization") { val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) @@ -165,6 +226,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") } + test("disk and memory storage with serialization and getLocalBytes") { + val store = new BlockManager(master, new KryoSerializer, 1200) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER) + store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER) + Thread.sleep(100) + assert(store.getLocalBytes("a2") != None, "a2 was not in store") + assert(store.getLocalBytes("a3") != None, "a3 was not in store") + assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store") + assert(store.getLocalBytes("a1") != None, "a1 was not in store") + assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store") + } + test("LRU with mixed storage levels") { val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) -- cgit v1.2.3