From a842c63044b5b1105228dc7aca21a1c31da90338 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 3 Sep 2012 16:24:00 -0700 Subject: Minor formatting fixes --- .../main/scala/spark/storage/BlockManager.scala | 27 ++++++++++++---------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 3aac8e50b4..1211f0f2c2 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -83,6 +83,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // TODO: This will be removed after cacheTracker is removed from the code base. var cacheTracker: CacheTracker = null + val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties() + initLogging() initialize() @@ -279,7 +281,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val results = new LinkedBlockingQueue[(String, Option[() => Iterator[Any]])] // Bound the number and memory usage of fetched remote blocks. - val parallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties val blocksToRequest = new Queue[(BlockManagerId, BlockMessage)] def sendRequest(bmId: BlockManagerId, blockMessages: Seq[BlockMessage]) { @@ -290,7 +291,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m case Some(message) => { val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) - blockMessageArray.foreach(blockMessage => { + for (blockMessage <- blockMessageArray) { if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { throw new SparkException( "Unexpected message " + blockMessage.getType + " received from " + cmId) @@ -298,7 +299,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val blockId = blockMessage.getId results.put((blockId, Some(() => dataDeserialize(blockMessage.getData)))) logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) - }) + } } case None => { logError("Could not get block(s) from " + cmId) @@ -318,9 +319,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m localBlockIds ++= blockIds } else { remoteBlockIds ++= blockIds - blockIds.foreach{blockId => + for (blockId <- blockIds) { val blockMessage = BlockMessage.fromGetBlock(GetBlock(blockId)) - if (initialRequests < parallelFetches) { + if (initialRequests < numParallelFetches) { initialRequestBlocks.getOrElseUpdate(address, new ArrayBuffer[BlockMessage]) .append(blockMessage) initialRequests += 1 @@ -331,15 +332,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - // Send out initial request(s) for 'parallelFetches' blocks. - for ((bmId, blockMessages) <- initialRequestBlocks) { sendRequest(bmId, blockMessages) } + // Send out initial request(s) for 'numParallelFetches' blocks. + for ((bmId, blockMessages) <- initialRequestBlocks) { + sendRequest(bmId, blockMessages) + } - logDebug("Started remote gets for " + parallelFetches + " blocks in " + + logDebug("Started remote gets for " + numParallelFetches + " blocks in " + Utils.getUsedTimeMs(startTime) + " ms") // Get the local blocks while remote blocks are being fetched. startTime = System.currentTimeMillis - localBlockIds.foreach(id => { + for (id <- localBlockIds) { getLocal(id) match { case Some(block) => { results.put((id, Some(() => block))) @@ -349,7 +352,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new BlockException(id, "Could not get block " + id + " from local machine") } } - }) + } logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") // Return an iterator that will read fetched blocks off the queue as they arrive. @@ -362,8 +365,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m resultsGotten += 1 val (blockId, functionOption) = results.take() if (remoteBlockIds.contains(blockId) && !blocksToRequest.isEmpty) { - val (bmId, blockMessage) = blocksToRequest.dequeue - sendRequest(bmId, Seq(blockMessage)) + val (bmId, blockMessage) = blocksToRequest.dequeue() + sendRequest(bmId, Seq(blockMessage)) } (blockId, functionOption.map(_.apply())) } -- cgit v1.2.3 From c308fbcb793b16b3b9accdf779def2776de80f9f Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 5 Sep 2012 15:59:48 -0700 Subject: Removed cache add/remove log messages from CacheTracker. Added log messages on BlockManagerMaster to reflect block add/remove. Also did some minor cleanup of storage package code. --- core/src/main/scala/spark/CacheTracker.scala | 14 +--- .../main/scala/spark/storage/BlockManager.scala | 68 +++++++-------- .../scala/spark/storage/BlockManagerMaster.scala | 97 ++++++++++++---------- .../scala/spark/storage/BlockManagerWorker.scala | 18 ++-- .../main/scala/spark/storage/BlockMessage.scala | 2 +- .../scala/spark/storage/BlockMessageArray.scala | 3 +- core/src/main/scala/spark/storage/BlockStore.scala | 60 ++++++++----- .../main/scala/spark/storage/StorageLevel.scala | 2 +- .../scala/spark/storage/BlockManagerSuite.scala | 4 +- 9 files changed, 143 insertions(+), 125 deletions(-) diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 22110832f8..356637825e 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -43,8 +43,6 @@ class CacheTrackerActor extends Actor with Logging { def receive = { case SlaveCacheStarted(host: String, size: Long) => - logInfo("Started slave cache (size %s) on %s".format( - Utils.memoryBytesToString(size), host)) slaveCapacity.put(host, size) slaveUsage.put(host, 0) sender ! true @@ -56,22 +54,12 @@ class CacheTrackerActor extends Actor with Logging { case AddedToCache(rddId, partition, host, size) => slaveUsage.put(host, getCacheUsage(host) + size) - logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format( - rddId, partition, host, Utils.memoryBytesToString(size), - Utils.memoryBytesToString(getCacheAvailable(host)))) locs(rddId)(partition) = host :: locs(rddId)(partition) sender ! true case DroppedFromCache(rddId, partition, host, size) => - logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format( - rddId, partition, host, Utils.memoryBytesToString(size), - Utils.memoryBytesToString(getCacheAvailable(host)))) slaveUsage.put(host, getCacheUsage(host) - size) // Do a sanity check to make sure usage is greater than 0. - val usage = getCacheUsage(host) - if (usage < 0) { - logError("Cache usage on %s is negative (%d)".format(host, usage)) - } locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host) sender ! true @@ -223,7 +211,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl logInfo("Computing partition " + split) try { // BlockManager will iterate over results from compute to create RDD - blockManager.put(key, rdd.compute(split), storageLevel, false) + blockManager.put(key, rdd.compute(split), storageLevel, true) //future.apply() // Wait for the reply from the cache tracker blockManager.get(key) match { case Some(values) => diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 1211f0f2c2..11a1cbb73e 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -1,29 +1,21 @@ package spark.storage -import java.io._ -import java.nio._ -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.LinkedBlockingQueue -import java.util.Collections - import akka.dispatch.{Await, Future} -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.{HashMap, HashSet} -import scala.collection.mutable.Queue -import scala.collection.JavaConversions._ +import akka.util.Duration -import it.unimi.dsi.fastutil.io._ +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import spark.CacheTracker -import spark.Logging -import spark.Serializer -import spark.SizeEstimator -import spark.SparkEnv -import spark.SparkException -import spark.Utils -import spark.util.ByteBufferInputStream +import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.nio.ByteBuffer +import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} +import scala.collection.JavaConversions._ + +import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils} import spark.network._ -import akka.util.Duration +import spark.util.ByteBufferInputStream + class BlockManagerId(var ip: String, var port: Int) extends Externalizable { def this() = this(null, 0) @@ -49,7 +41,8 @@ class BlockManagerId(var ip: String, var port: Int) extends Externalizable { } -case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) +case class BlockException(blockId: String, message: String, ex: Exception = null) +extends Exception(message) class BlockLocker(numLockers: Int) { @@ -115,10 +108,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Change storage level for a local block and tell master is necesary. - * If new level is invalid, then block info (if it exists) will be silently removed. + * Change the storage level for a local block in the block info meta data, and + * tell the master if necessary. Note that this is only a meta data change and + * does NOT actually change the storage of the block. If the new level is + * invalid, then block info (if exists) will be silently removed. */ - def setLevel(blockId: String, level: StorageLevel, tellMaster: Boolean = true) { + private[spark] def setLevelAndTellMaster( + blockId: String, level: StorageLevel, tellMaster: Boolean = true) { + if (level == null) { throw new IllegalArgumentException("Storage level is null") } @@ -141,8 +138,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Tell master if necessary if (newTellMaster) { + master.mustHeartBeat(HeartBeat( + blockManagerId, + blockId, + level, + if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0, + if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0)) logDebug("Told master about block " + blockId) - notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0)) } else { logDebug("Did not tell master about block " + blockId) } @@ -431,9 +433,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m case _ => throw new Exception("Unexpected return value") } } - + // Store the storage level - setLevel(blockId, level, tellMaster) + setLevelAndTellMaster(blockId, level, tellMaster) } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) @@ -461,7 +463,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m /** * Put a new block of serialized bytes to the block manager. */ - def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) { + def putBytes( + blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) { + if (blockId == null) { throw new IllegalArgumentException("Block Id is null") } @@ -500,7 +504,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } // Store the storage level - setLevel(blockId, level, tellMaster) + setLevelAndTellMaster(blockId, level, tellMaster) } // TODO: This code will be removed when CacheTracker is gone. @@ -587,7 +591,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } memoryStore.remove(blockId) val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication) - setLevel(blockId, newLevel) + setLevelAndTellMaster(blockId, newLevel) } } @@ -606,10 +610,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator } - private def notifyMaster(heartBeat: HeartBeat) { - master.mustHeartBeat(heartBeat) - } - def stop() { connectionManager.stop() blockInfo.clear() diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 9f03c5a32c..2f14db4e28 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -3,22 +3,18 @@ package spark.storage import java.io._ import java.util.{HashMap => JHashMap} -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import akka.actor._ import akka.dispatch._ import akka.pattern.ask import akka.remote._ -import akka.util.Duration -import akka.util.Timeout +import akka.util.{Duration, Timeout} import akka.util.duration._ -import spark.Logging -import spark.SparkException -import spark.Utils +import spark.{Logging, SparkException, Utils} + sealed trait ToBlockManagerMaster @@ -27,13 +23,13 @@ case class RegisterBlockManager( maxMemSize: Long, maxDiskSize: Long) extends ToBlockManagerMaster - + class HeartBeat( var blockManagerId: BlockManagerId, var blockId: String, var storageLevel: StorageLevel, - var deserializedSize: Long, - var size: Long) + var memSize: Long, + var diskSize: Long) extends ToBlockManagerMaster with Externalizable { @@ -43,8 +39,8 @@ class HeartBeat( blockManagerId.writeExternal(out) out.writeUTF(blockId) storageLevel.writeExternal(out) - out.writeInt(deserializedSize.toInt) - out.writeInt(size.toInt) + out.writeInt(memSize.toInt) + out.writeInt(diskSize.toInt) } override def readExternal(in: ObjectInput) { @@ -53,8 +49,8 @@ class HeartBeat( blockId = in.readUTF() storageLevel = new StorageLevel() storageLevel.readExternal(in) - deserializedSize = in.readInt() - size = in.readInt() + memSize = in.readInt() + diskSize = in.readInt() } } @@ -62,15 +58,14 @@ object HeartBeat { def apply(blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, - deserializedSize: Long, - size: Long): HeartBeat = { - new HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) + memSize: Long, + diskSize: Long): HeartBeat = { + new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize) } - // For pattern-matching def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.deserializedSize, h.size)) + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) } } @@ -88,49 +83,64 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { class BlockManagerInfo( + val blockManagerId: BlockManagerId, timeMs: Long, - maxMem: Long, - maxDisk: Long) { + val maxMem: Long, + val maxDisk: Long) { private var lastSeenMs = timeMs private var remainedMem = maxMem private var remainedDisk = maxDisk private val blocks = new JHashMap[String, StorageLevel] + + logInfo("Registering block manager (%s:%d, ram: %d, disk: %d)".format( + blockManagerId.ip, blockManagerId.port, maxMem, maxDisk)) def updateLastSeenMs() { lastSeenMs = System.currentTimeMillis() / 1000 } - def addBlock(blockId: String, storageLevel: StorageLevel, deserializedSize: Long, size: Long) = - synchronized { + def updateBlockInfo( + blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) = synchronized { + updateLastSeenMs() if (blocks.containsKey(blockId)) { - val oriLevel: StorageLevel = blocks.get(blockId) + // The block exists on the slave already. + val originalLevel: StorageLevel = blocks.get(blockId) - if (oriLevel.deserialized) { - remainedMem += deserializedSize + if (originalLevel.useMemory) { + remainedMem += memSize } - if (oriLevel.useMemory) { - remainedMem += size - } - if (oriLevel.useDisk) { - remainedDisk += size + if (originalLevel.useDisk) { + remainedDisk += diskSize } } - if (storageLevel.isValid) { + if (storageLevel.isValid) { + // isValid means it is either stored in-memory or on-disk. blocks.put(blockId, storageLevel) - if (storageLevel.deserialized) { - remainedMem -= deserializedSize - } if (storageLevel.useMemory) { - remainedMem -= size + remainedMem -= memSize + logInfo("Added %s in memory on %s:%d (size: %d, free: %d)".format( + blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedMem)) } if (storageLevel.useDisk) { - remainedDisk -= size + remainedDisk -= diskSize + logInfo("Added %s on disk on %s:%d (size: %d, free: %d)".format( + blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk)) } - } else { + } else if (blocks.containsKey(blockId)) { + // If isValid is not true, drop the block. + val originalLevel: StorageLevel = blocks.get(blockId) blocks.remove(blockId) + if (originalLevel.useMemory) { + logInfo("Removed %s on %s:%d in memory (size: %d, free: %d)".format( + blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedDisk)) + } + if (originalLevel.useDisk) { + logInfo("Removed %s on %s:%d on disk (size: %d, free: %d)".format( + blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk)) + } } } @@ -204,12 +214,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) - logInfo("Got Register Msg from " + blockManagerId) if (blockManagerId.ip == Utils.localHostName() && !isLocal) { logInfo("Got Register Msg from master node, don't register it") } else { blockManagerInfo += (blockManagerId -> new BlockManagerInfo( - System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize)) + blockManagerId, System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize)) } logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true @@ -219,8 +228,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, - deserializedSize: Long, - size: Long) { + memSize: Long, + diskSize: Long) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " + blockId + " " @@ -231,7 +240,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! true } - blockManagerInfo(blockManagerId).addBlock(blockId, storageLevel, deserializedSize, size) + blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) var locations: HashSet[BlockManagerId] = null if (blockInfo.containsKey(blockId)) { diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index d74cdb38a8..e317ad3642 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -1,23 +1,21 @@ package spark.storage -import java.nio._ +import java.nio.ByteBuffer import scala.actors._ import scala.actors.Actor._ import scala.actors.remote._ - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random -import spark.Logging -import spark.Utils -import spark.SparkEnv +import spark.{Logging, Utils, SparkEnv} import spark.network._ /** - * This should be changed to use event model late. + * A network interface for BlockManager. Each slave should have one + * BlockManagerWorker. + * + * TODO: Use event model. */ class BlockManagerWorker(val blockManager: BlockManager) extends Logging { initLogging() @@ -32,7 +30,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging { logDebug("Handling as a buffer message " + bufferMessage) val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage) logDebug("Parsed as a block message array") - val responseMessages = blockMessages.map(processBlockMessage _).filter(_ != None).map(_.get) + val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get) /*logDebug("Processed block messages")*/ return Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index 0b2ed69e07..b9833273e5 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -1,6 +1,6 @@ package spark.storage -import java.nio._ +import java.nio.ByteBuffer import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index 497a19856e..928857056f 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -1,5 +1,6 @@ package spark.storage -import java.nio._ + +import java.nio.ByteBuffer import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index 17f4f51aa8..f66b5bc897 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -1,15 +1,14 @@ package spark.storage -import spark.{Utils, Logging, Serializer, SizeEstimator} -import scala.collection.mutable.ArrayBuffer import java.io.{File, RandomAccessFile} import java.nio.ByteBuffer import java.nio.channels.FileChannel.MapMode -import java.util.{UUID, LinkedHashMap} -import java.util.concurrent.Executors -import java.util.concurrent.ConcurrentHashMap -import it.unimi.dsi.fastutil.io._ -import java.util.concurrent.ArrayBlockingQueue +import java.util.{LinkedHashMap, UUID} +import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} + +import scala.collection.mutable.ArrayBuffer + +import spark.{Utils, Logging, Serializer, SizeEstimator} /** * Abstract class to store blocks @@ -19,7 +18,13 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging { def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) - def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] + def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) + : Either[Iterator[Any], ByteBuffer] + + /** + * Return the size of a block. + */ + def getSize(blockId: String): Long def getBytes(blockId: String): Option[ByteBuffer] @@ -62,6 +67,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } blockDropper.start() + logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) + + def freeMemory: Long = maxMemory - currentMemory + + def getSize(blockId: String): Long = memoryStore.synchronized { memoryStore.get(blockId).size } def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { if (level.deserialized) { @@ -74,17 +84,20 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = new Entry(elements, sizeEstimate, true) memoryStore.synchronized { memoryStore.put(blockId, entry) } currentMemory += sizeEstimate - logDebug("Block " + blockId + " stored as values to memory") + logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( + blockId, sizeEstimate, freeMemory)) } else { val entry = new Entry(bytes, bytes.array().length, false) ensureFreeSpace(bytes.array.length) memoryStore.synchronized { memoryStore.put(blockId, entry) } currentMemory += bytes.array().length - logDebug("Block " + blockId + " stored as " + bytes.array().length + " bytes to memory") + logInfo("Block %s stored as %d bytes to memory (free %d)".format( + blockId, bytes.array().length, freeMemory)) } } - def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = { + def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) + : Either[Iterator[Any], ByteBuffer] = { if (level.deserialized) { val elements = new ArrayBuffer[Any] elements ++= values @@ -93,7 +106,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = new Entry(elements, sizeEstimate, true) memoryStore.synchronized { memoryStore.put(blockId, entry) } currentMemory += sizeEstimate - logDebug("Block " + blockId + " stored as values to memory") + logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format( + blockId, sizeEstimate, freeMemory)) return Left(elements.iterator) } else { val bytes = dataSerialize(values) @@ -101,7 +115,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) val entry = new Entry(bytes, bytes.array().length, false) memoryStore.synchronized { memoryStore.put(blockId, entry) } currentMemory += bytes.array().length - logDebug("Block " + blockId + " stored as " + bytes.array.length + " bytes to memory") + logInfo("Block %s stored as %d bytes to memory (free %d)".format( + blockId, bytes.array.length, freeMemory)) return Right(bytes) } } @@ -128,7 +143,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry != null) { memoryStore.remove(blockId) currentMemory -= entry.size - logDebug("Block " + blockId + " of size " + entry.size + " dropped from memory") + logInfo("Block %s of size %d dropped from memory (free %d)".format( + blockId, entry.size, freeMemory)) } else { logWarning("Block " + blockId + " could not be removed as it doesnt exist") } @@ -164,11 +180,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long) entry.dropPending = true } selectedMemory += pair.getValue.size - logDebug("Block " + blockId + " selected for dropping") + logInfo("Block " + blockId + " selected for dropping") } } - logDebug("" + selectedBlocks.size + " new blocks selected for dropping, " + + logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " + blocksToDrop.size + " blocks pending") var i = 0 while (i < selectedBlocks.size) { @@ -192,7 +208,11 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) var lastLocalDirUsed = 0 addShutdownHook() - + + def getSize(blockId: String): Long = { + getFile(blockId).length + } + def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { logDebug("Attempting to put block " + blockId) val startTime = System.currentTimeMillis @@ -203,13 +223,15 @@ class DiskStore(blockManager: BlockManager, rootDirs: String) buffer.put(bytes.array) channel.close() val finishTime = System.currentTimeMillis - logDebug("Block " + blockId + " stored to file of " + bytes.array.length + " bytes to disk in " + (finishTime - startTime) + " ms") + logDebug("Block %s stored to file of %d bytes to disk in %d ms".format( + blockId, bytes.array.length, (finishTime - startTime))) } else { logError("File not created for block " + blockId) } } - def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = { + def putValues(blockId: String, values: Iterator[Any], level: StorageLevel) + : Either[Iterator[Any], ByteBuffer] = { val bytes = dataSerialize(values) logDebug("Converted block " + blockId + " to " + bytes.array.length + " bytes") putBytes(blockId, bytes, level) diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index f067a2a6c5..1d38ca13cc 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -1,6 +1,6 @@ package spark.storage -import java.io._ +import java.io.{Externalizable, ObjectInput, ObjectOutput} class StorageLevel( var useDisk: Boolean, diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index f3f891e471..9e55647bd0 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -70,8 +70,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") // Setting storage level of a1 and a2 to invalid; they should be removed from store and master - store.setLevel("a1", new StorageLevel(false, false, false, 1)) - store.setLevel("a2", new StorageLevel(true, false, false, 0)) + store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1)) + store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0)) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") -- cgit v1.2.3