aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-09-06 01:46:48 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2012-09-06 01:46:48 -0700
commit7c6936f1bc91642d8ca5cba97af7bb09368c96b1 (patch)
tree7a8bae5ea635231df90d9f80986b60f75eccd1eb
parent5e8e8e4c9df5e3a0cdef99796fef403e5fe82d02 (diff)
parent53a5681c8a8123e203d830ec97a62b5e638d93fe (diff)
downloadspark-7c6936f1bc91642d8ca5cba97af7bb09368c96b1.tar.gz
spark-7c6936f1bc91642d8ca5cba97af7bb09368c96b1.tar.bz2
spark-7c6936f1bc91642d8ca5cba97af7bb09368c96b1.zip
Merge remote-tracking branch 'upstream/dev' into dev
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala14
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala95
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala97
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala18
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockMessageArray.scala3
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala60
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala2
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala4
9 files changed, 158 insertions, 137 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 22110832f8..356637825e 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -43,8 +43,6 @@ class CacheTrackerActor extends Actor with Logging {
def receive = {
case SlaveCacheStarted(host: String, size: Long) =>
- logInfo("Started slave cache (size %s) on %s".format(
- Utils.memoryBytesToString(size), host))
slaveCapacity.put(host, size)
slaveUsage.put(host, 0)
sender ! true
@@ -56,22 +54,12 @@ class CacheTrackerActor extends Actor with Logging {
case AddedToCache(rddId, partition, host, size) =>
slaveUsage.put(host, getCacheUsage(host) + size)
- logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format(
- rddId, partition, host, Utils.memoryBytesToString(size),
- Utils.memoryBytesToString(getCacheAvailable(host))))
locs(rddId)(partition) = host :: locs(rddId)(partition)
sender ! true
case DroppedFromCache(rddId, partition, host, size) =>
- logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format(
- rddId, partition, host, Utils.memoryBytesToString(size),
- Utils.memoryBytesToString(getCacheAvailable(host))))
slaveUsage.put(host, getCacheUsage(host) - size)
// Do a sanity check to make sure usage is greater than 0.
- val usage = getCacheUsage(host)
- if (usage < 0) {
- logError("Cache usage on %s is negative (%d)".format(host, usage))
- }
locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host)
sender ! true
@@ -223,7 +211,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
logInfo("Computing partition " + split)
try {
// BlockManager will iterate over results from compute to create RDD
- blockManager.put(key, rdd.compute(split), storageLevel, false)
+ blockManager.put(key, rdd.compute(split), storageLevel, true)
//future.apply() // Wait for the reply from the cache tracker
blockManager.get(key) match {
case Some(values) =>
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3aac8e50b4..11a1cbb73e 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,29 +1,21 @@
package spark.storage
-import java.io._
-import java.nio._
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.Collections
-
import akka.dispatch.{Await, Future}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.{HashMap, HashSet}
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions._
+import akka.util.Duration
-import it.unimi.dsi.fastutil.io._
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import spark.CacheTracker
-import spark.Logging
-import spark.Serializer
-import spark.SizeEstimator
-import spark.SparkEnv
-import spark.SparkException
-import spark.Utils
-import spark.util.ByteBufferInputStream
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConversions._
+
+import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils}
import spark.network._
-import akka.util.Duration
+import spark.util.ByteBufferInputStream
+
class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0)
@@ -49,7 +41,8 @@ class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
}
-case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message)
+case class BlockException(blockId: String, message: String, ex: Exception = null)
+extends Exception(message)
class BlockLocker(numLockers: Int) {
@@ -83,6 +76,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null
+ val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties()
+
initLogging()
initialize()
@@ -113,10 +108,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Change storage level for a local block and tell master is necesary.
- * If new level is invalid, then block info (if it exists) will be silently removed.
+ * Change the storage level for a local block in the block info meta data, and
+ * tell the master if necessary. Note that this is only a meta data change and
+ * does NOT actually change the storage of the block. If the new level is
+ * invalid, then block info (if exists) will be silently removed.
*/
- def setLevel(blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
+ private[spark] def setLevelAndTellMaster(
+ blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
+
if (level == null) {
throw new IllegalArgumentException("Storage level is null")
}
@@ -139,8 +138,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Tell master if necessary
if (newTellMaster) {
+ master.mustHeartBeat(HeartBeat(
+ blockManagerId,
+ blockId,
+ level,
+ if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
+ if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
logDebug("Told master about block " + blockId)
- notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0))
} else {
logDebug("Did not tell master about block " + blockId)
}
@@ -279,7 +283,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val results = new LinkedBlockingQueue[(String, Option[() => Iterator[Any]])]
// Bound the number and memory usage of fetched remote blocks.
- val parallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
val blocksToRequest = new Queue[(BlockManagerId, BlockMessage)]
def sendRequest(bmId: BlockManagerId, blockMessages: Seq[BlockMessage]) {
@@ -290,7 +293,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
case Some(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
- blockMessageArray.foreach(blockMessage => {
+ for (blockMessage <- blockMessageArray) {
if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
throw new SparkException(
"Unexpected message " + blockMessage.getType + " received from " + cmId)
@@ -298,7 +301,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val blockId = blockMessage.getId
results.put((blockId, Some(() => dataDeserialize(blockMessage.getData))))
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
- })
+ }
}
case None => {
logError("Could not get block(s) from " + cmId)
@@ -318,9 +321,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
localBlockIds ++= blockIds
} else {
remoteBlockIds ++= blockIds
- blockIds.foreach{blockId =>
+ for (blockId <- blockIds) {
val blockMessage = BlockMessage.fromGetBlock(GetBlock(blockId))
- if (initialRequests < parallelFetches) {
+ if (initialRequests < numParallelFetches) {
initialRequestBlocks.getOrElseUpdate(address, new ArrayBuffer[BlockMessage])
.append(blockMessage)
initialRequests += 1
@@ -331,15 +334,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
- // Send out initial request(s) for 'parallelFetches' blocks.
- for ((bmId, blockMessages) <- initialRequestBlocks) { sendRequest(bmId, blockMessages) }
+ // Send out initial request(s) for 'numParallelFetches' blocks.
+ for ((bmId, blockMessages) <- initialRequestBlocks) {
+ sendRequest(bmId, blockMessages)
+ }
- logDebug("Started remote gets for " + parallelFetches + " blocks in " +
+ logDebug("Started remote gets for " + numParallelFetches + " blocks in " +
Utils.getUsedTimeMs(startTime) + " ms")
// Get the local blocks while remote blocks are being fetched.
startTime = System.currentTimeMillis
- localBlockIds.foreach(id => {
+ for (id <- localBlockIds) {
getLocal(id) match {
case Some(block) => {
results.put((id, Some(() => block)))
@@ -349,7 +354,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new BlockException(id, "Could not get block " + id + " from local machine")
}
}
- })
+ }
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
// Return an iterator that will read fetched blocks off the queue as they arrive.
@@ -362,8 +367,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
resultsGotten += 1
val (blockId, functionOption) = results.take()
if (remoteBlockIds.contains(blockId) && !blocksToRequest.isEmpty) {
- val (bmId, blockMessage) = blocksToRequest.dequeue
- sendRequest(bmId, Seq(blockMessage))
+ val (bmId, blockMessage) = blocksToRequest.dequeue()
+ sendRequest(bmId, Seq(blockMessage))
}
(blockId, functionOption.map(_.apply()))
}
@@ -428,9 +433,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
case _ => throw new Exception("Unexpected return value")
}
}
-
+
// Store the storage level
- setLevel(blockId, level, tellMaster)
+ setLevelAndTellMaster(blockId, level, tellMaster)
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -458,7 +463,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
/**
* Put a new block of serialized bytes to the block manager.
*/
- def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
+ def putBytes(
+ blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
+
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
@@ -497,7 +504,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
// Store the storage level
- setLevel(blockId, level, tellMaster)
+ setLevelAndTellMaster(blockId, level, tellMaster)
}
// TODO: This code will be removed when CacheTracker is gone.
@@ -584,7 +591,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
memoryStore.remove(blockId)
val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
- setLevel(blockId, newLevel)
+ setLevelAndTellMaster(blockId, newLevel)
}
}
@@ -603,10 +610,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
}
- private def notifyMaster(heartBeat: HeartBeat) {
- master.mustHeartBeat(heartBeat)
- }
-
def stop() {
connectionManager.stop()
blockInfo.clear()
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 9f03c5a32c..2f14db4e28 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -3,22 +3,18 @@ package spark.storage
import java.io._
import java.util.{HashMap => JHashMap}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
import akka.actor._
import akka.dispatch._
import akka.pattern.ask
import akka.remote._
-import akka.util.Duration
-import akka.util.Timeout
+import akka.util.{Duration, Timeout}
import akka.util.duration._
-import spark.Logging
-import spark.SparkException
-import spark.Utils
+import spark.{Logging, SparkException, Utils}
+
sealed trait ToBlockManagerMaster
@@ -27,13 +23,13 @@ case class RegisterBlockManager(
maxMemSize: Long,
maxDiskSize: Long)
extends ToBlockManagerMaster
-
+
class HeartBeat(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
- var deserializedSize: Long,
- var size: Long)
+ var memSize: Long,
+ var diskSize: Long)
extends ToBlockManagerMaster
with Externalizable {
@@ -43,8 +39,8 @@ class HeartBeat(
blockManagerId.writeExternal(out)
out.writeUTF(blockId)
storageLevel.writeExternal(out)
- out.writeInt(deserializedSize.toInt)
- out.writeInt(size.toInt)
+ out.writeInt(memSize.toInt)
+ out.writeInt(diskSize.toInt)
}
override def readExternal(in: ObjectInput) {
@@ -53,8 +49,8 @@ class HeartBeat(
blockId = in.readUTF()
storageLevel = new StorageLevel()
storageLevel.readExternal(in)
- deserializedSize = in.readInt()
- size = in.readInt()
+ memSize = in.readInt()
+ diskSize = in.readInt()
}
}
@@ -62,15 +58,14 @@ object HeartBeat {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
- deserializedSize: Long,
- size: Long): HeartBeat = {
- new HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ memSize: Long,
+ diskSize: Long): HeartBeat = {
+ new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
-
// For pattern-matching
def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.deserializedSize, h.size))
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
@@ -88,49 +83,64 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
+ val blockManagerId: BlockManagerId,
timeMs: Long,
- maxMem: Long,
- maxDisk: Long) {
+ val maxMem: Long,
+ val maxDisk: Long) {
private var lastSeenMs = timeMs
private var remainedMem = maxMem
private var remainedDisk = maxDisk
private val blocks = new JHashMap[String, StorageLevel]
+
+ logInfo("Registering block manager (%s:%d, ram: %d, disk: %d)".format(
+ blockManagerId.ip, blockManagerId.port, maxMem, maxDisk))
def updateLastSeenMs() {
lastSeenMs = System.currentTimeMillis() / 1000
}
- def addBlock(blockId: String, storageLevel: StorageLevel, deserializedSize: Long, size: Long) =
- synchronized {
+ def updateBlockInfo(
+ blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) = synchronized {
+
updateLastSeenMs()
if (blocks.containsKey(blockId)) {
- val oriLevel: StorageLevel = blocks.get(blockId)
+ // The block exists on the slave already.
+ val originalLevel: StorageLevel = blocks.get(blockId)
- if (oriLevel.deserialized) {
- remainedMem += deserializedSize
+ if (originalLevel.useMemory) {
+ remainedMem += memSize
}
- if (oriLevel.useMemory) {
- remainedMem += size
- }
- if (oriLevel.useDisk) {
- remainedDisk += size
+ if (originalLevel.useDisk) {
+ remainedDisk += diskSize
}
}
- if (storageLevel.isValid) {
+ if (storageLevel.isValid) {
+ // isValid means it is either stored in-memory or on-disk.
blocks.put(blockId, storageLevel)
- if (storageLevel.deserialized) {
- remainedMem -= deserializedSize
- }
if (storageLevel.useMemory) {
- remainedMem -= size
+ remainedMem -= memSize
+ logInfo("Added %s in memory on %s:%d (size: %d, free: %d)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedMem))
}
if (storageLevel.useDisk) {
- remainedDisk -= size
+ remainedDisk -= diskSize
+ logInfo("Added %s on disk on %s:%d (size: %d, free: %d)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
}
- } else {
+ } else if (blocks.containsKey(blockId)) {
+ // If isValid is not true, drop the block.
+ val originalLevel: StorageLevel = blocks.get(blockId)
blocks.remove(blockId)
+ if (originalLevel.useMemory) {
+ logInfo("Removed %s on %s:%d in memory (size: %d, free: %d)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedDisk))
+ }
+ if (originalLevel.useDisk) {
+ logInfo("Removed %s on %s:%d on disk (size: %d, free: %d)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
+ }
}
}
@@ -204,12 +214,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- logInfo("Got Register Msg from " + blockManagerId)
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
- System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
+ blockManagerId, System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
}
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
@@ -219,8 +228,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
- deserializedSize: Long,
- size: Long) {
+ memSize: Long,
+ diskSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
@@ -231,7 +240,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! true
}
- blockManagerInfo(blockManagerId).addBlock(blockId, storageLevel, deserializedSize, size)
+ blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: HashSet[BlockManagerId] = null
if (blockInfo.containsKey(blockId)) {
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index d74cdb38a8..e317ad3642 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -1,23 +1,21 @@
package spark.storage
-import java.nio._
+import java.nio.ByteBuffer
import scala.actors._
import scala.actors.Actor._
import scala.actors.remote._
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
-import spark.Logging
-import spark.Utils
-import spark.SparkEnv
+import spark.{Logging, Utils, SparkEnv}
import spark.network._
/**
- * This should be changed to use event model late.
+ * A network interface for BlockManager. Each slave should have one
+ * BlockManagerWorker.
+ *
+ * TODO: Use event model.
*/
class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
initLogging()
@@ -32,7 +30,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
logDebug("Handling as a buffer message " + bufferMessage)
val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
logDebug("Parsed as a block message array")
- val responseMessages = blockMessages.map(processBlockMessage _).filter(_ != None).map(_.get)
+ val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
/*logDebug("Processed block messages")*/
return Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch {
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index 0b2ed69e07..b9833273e5 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.nio._
+import java.nio.ByteBuffer
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index 497a19856e..928857056f 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -1,5 +1,6 @@
package spark.storage
-import java.nio._
+
+import java.nio.ByteBuffer
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 17f4f51aa8..f66b5bc897 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,15 +1,14 @@
package spark.storage
-import spark.{Utils, Logging, Serializer, SizeEstimator}
-import scala.collection.mutable.ArrayBuffer
import java.io.{File, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
-import java.util.{UUID, LinkedHashMap}
-import java.util.concurrent.Executors
-import java.util.concurrent.ConcurrentHashMap
-import it.unimi.dsi.fastutil.io._
-import java.util.concurrent.ArrayBlockingQueue
+import java.util.{LinkedHashMap, UUID}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.{Utils, Logging, Serializer, SizeEstimator}
/**
* Abstract class to store blocks
@@ -19,7 +18,13 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer]
+ def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+ : Either[Iterator[Any], ByteBuffer]
+
+ /**
+ * Return the size of a block.
+ */
+ def getSize(blockId: String): Long
def getBytes(blockId: String): Option[ByteBuffer]
@@ -62,6 +67,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
blockDropper.start()
+ logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
+
+ def freeMemory: Long = maxMemory - currentMemory
+
+ def getSize(blockId: String): Long = memoryStore.synchronized { memoryStore.get(blockId).size }
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) {
@@ -74,17 +84,20 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
- logDebug("Block " + blockId + " stored as values to memory")
+ logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
+ blockId, sizeEstimate, freeMemory))
} else {
val entry = new Entry(bytes, bytes.array().length, false)
ensureFreeSpace(bytes.array.length)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.array().length
- logDebug("Block " + blockId + " stored as " + bytes.array().length + " bytes to memory")
+ logInfo("Block %s stored as %d bytes to memory (free %d)".format(
+ blockId, bytes.array().length, freeMemory))
}
}
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = {
+ def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+ : Either[Iterator[Any], ByteBuffer] = {
if (level.deserialized) {
val elements = new ArrayBuffer[Any]
elements ++= values
@@ -93,7 +106,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
- logDebug("Block " + blockId + " stored as values to memory")
+ logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
+ blockId, sizeEstimate, freeMemory))
return Left(elements.iterator)
} else {
val bytes = dataSerialize(values)
@@ -101,7 +115,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(bytes, bytes.array().length, false)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.array().length
- logDebug("Block " + blockId + " stored as " + bytes.array.length + " bytes to memory")
+ logInfo("Block %s stored as %d bytes to memory (free %d)".format(
+ blockId, bytes.array.length, freeMemory))
return Right(bytes)
}
}
@@ -128,7 +143,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry != null) {
memoryStore.remove(blockId)
currentMemory -= entry.size
- logDebug("Block " + blockId + " of size " + entry.size + " dropped from memory")
+ logInfo("Block %s of size %d dropped from memory (free %d)".format(
+ blockId, entry.size, freeMemory))
} else {
logWarning("Block " + blockId + " could not be removed as it doesnt exist")
}
@@ -164,11 +180,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
entry.dropPending = true
}
selectedMemory += pair.getValue.size
- logDebug("Block " + blockId + " selected for dropping")
+ logInfo("Block " + blockId + " selected for dropping")
}
}
- logDebug("" + selectedBlocks.size + " new blocks selected for dropping, " +
+ logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
blocksToDrop.size + " blocks pending")
var i = 0
while (i < selectedBlocks.size) {
@@ -192,7 +208,11 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
var lastLocalDirUsed = 0
addShutdownHook()
-
+
+ def getSize(blockId: String): Long = {
+ getFile(blockId).length
+ }
+
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
@@ -203,13 +223,15 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
buffer.put(bytes.array)
channel.close()
val finishTime = System.currentTimeMillis
- logDebug("Block " + blockId + " stored to file of " + bytes.array.length + " bytes to disk in " + (finishTime - startTime) + " ms")
+ logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+ blockId, bytes.array.length, (finishTime - startTime)))
} else {
logError("File not created for block " + blockId)
}
}
- def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = {
+ def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
+ : Either[Iterator[Any], ByteBuffer] = {
val bytes = dataSerialize(values)
logDebug("Converted block " + blockId + " to " + bytes.array.length + " bytes")
putBytes(blockId, bytes, level)
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index f067a2a6c5..1d38ca13cc 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.io._
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
class StorageLevel(
var useDisk: Boolean,
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index f3f891e471..9e55647bd0 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -70,8 +70,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
// Setting storage level of a1 and a2 to invalid; they should be removed from store and master
- store.setLevel("a1", new StorageLevel(false, false, false, 1))
- store.setLevel("a2", new StorageLevel(true, false, false, 0))
+ store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
+ store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")