From 1b7a0451ed7df78838ca7ea09dfa5ba0e236acfe Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 13 Dec 2012 00:04:42 -0800 Subject: Added the ability in block manager to remove blocks. --- core/src/main/scala/spark/SparkEnv.scala | 11 +- .../main/scala/spark/storage/BlockManager.scala | 83 ++++----- .../main/scala/spark/storage/BlockManagerId.scala | 29 +++ .../scala/spark/storage/BlockManagerMaster.scala | 199 +++++++++------------ .../scala/spark/storage/BlockManagerMessages.scala | 102 +++++++++++ .../spark/storage/BlockManagerSlaveActor.scala | 16 ++ .../main/scala/spark/storage/ThreadingTest.scala | 13 +- .../main/scala/spark/util/GenerationIdUtil.scala | 19 ++ .../scala/spark/storage/BlockManagerSuite.scala | 59 ++++-- 9 files changed, 361 insertions(+), 170 deletions(-) create mode 100644 core/src/main/scala/spark/storage/BlockManagerId.scala create mode 100644 core/src/main/scala/spark/storage/BlockManagerMessages.scala create mode 100644 core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala create mode 100644 core/src/main/scala/spark/util/GenerationIdUtil.scala diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 272d7cdad3..41441720a7 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -86,10 +86,13 @@ object SparkEnv extends Logging { } val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer") - - val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal) + + val masterIp: String = System.getProperty("spark.master.host", "localhost") + val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt + val blockManagerMaster = new BlockManagerMaster( + actorSystem, isMaster, isLocal, masterIp, masterPort) val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer) - + val connectionManager = blockManager.connectionManager val broadcastManager = new BroadcastManager(isMaster) @@ -104,7 +107,7 @@ object SparkEnv extends Logging { val shuffleFetcher = instantiateClass[ShuffleFetcher]( "spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher") - + val httpFileServer = new HttpFileServer() httpFileServer.initialize() System.setProperty("spark.fileserver.uri", httpFileServer.serverUri) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index df295b1820..b2c9e2cc40 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -1,59 +1,39 @@ package spark.storage -import akka.actor.{ActorSystem, Cancellable} +import java.io.{InputStream, OutputStream} +import java.nio.{ByteBuffer, MappedByteBuffer} +import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} +import scala.collection.JavaConversions._ + +import akka.actor.{ActorSystem, Cancellable, Props} import akka.dispatch.{Await, Future} import akka.util.Duration import akka.util.duration._ -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - -import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput} -import java.nio.{MappedByteBuffer, ByteBuffer} -import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} +import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} -import scala.collection.JavaConversions._ +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer -import spark.util.ByteBufferInputStream -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import sun.nio.ch.DirectBuffer - - -private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { - def this() = this(null, 0) // For deserialization only - - def this(in: ObjectInput) = this(in.readUTF(), in.readInt()) +import spark.util.{ByteBufferInputStream, GenerationIdUtil} - override def writeExternal(out: ObjectOutput) { - out.writeUTF(ip) - out.writeInt(port) - } - - override def readExternal(in: ObjectInput) { - ip = in.readUTF() - port = in.readInt() - } - - override def toString = "BlockManagerId(" + ip + ", " + port + ")" - - override def hashCode = ip.hashCode * 41 + port +import sun.nio.ch.DirectBuffer - override def equals(that: Any) = that match { - case id: BlockManagerId => port == id.port && ip == id.ip - case _ => false - } -} private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) private[spark] -class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, - val serializer: Serializer, maxMemory: Long) +class BlockManager( + actorSystem: ActorSystem, + val master: BlockManagerMaster, + val serializer: Serializer, + maxMemory: Long) extends Logging { class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { @@ -110,6 +90,9 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, val host = System.getProperty("spark.hostname", Utils.localHostName()) + val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), + name = "BlockManagerActor" + GenerationIdUtil.BLOCK_MANAGER.next) + @volatile private var shuttingDown = false private def heartBeat() { @@ -134,8 +117,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, * BlockManagerWorker actor. */ private def initialize() { - master.mustRegisterBlockManager( - RegisterBlockManager(blockManagerId, maxMemory)) + master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor) BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { @@ -171,8 +153,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, def reregister() { // TODO: We might need to rate limit reregistering. logInfo("BlockManager reregistering with master") - master.mustRegisterBlockManager( - RegisterBlockManager(blockManagerId, maxMemory)) + master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor) reportAllBlocks() } @@ -865,6 +846,25 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, } } + /** + * Remove a block from both memory and disk. This one doesn't report to the master + * because it expects the master to initiate the original block removal command, and + * then the master can update the block tracking itself. + */ + def removeBlock(blockId: String) { + logInfo("Removing block " + blockId) + val info = blockInfo.get(blockId) + if (info != null) info.synchronized { + // Removals are idempotent in disk store and memory store. At worst, we get a warning. + memoryStore.remove(blockId) + diskStore.remove(blockId) + blockInfo.remove(blockId) + } else { + // The block has already been removed; do nothing. + logWarning("Block " + blockId + " does not exist.") + } + } + def shouldCompress(blockId: String): Boolean = { if (blockId.startsWith("shuffle_")) { compressShuffle @@ -914,6 +914,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, heartBeatTask.cancel() } connectionManager.stop() + master.actorSystem.stop(slaveActor) blockInfo.clear() memoryStore.clear() diskStore.clear() diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala new file mode 100644 index 0000000000..03cd141805 --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockManagerId.scala @@ -0,0 +1,29 @@ +package spark.storage + +import java.io.{Externalizable, ObjectInput, ObjectOutput} + + +private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { + def this() = this(null, 0) // For deserialization only + + def this(in: ObjectInput) = this(in.readUTF(), in.readInt()) + + override def writeExternal(out: ObjectOutput) { + out.writeUTF(ip) + out.writeInt(port) + } + + override def readExternal(in: ObjectInput) { + ip = in.readUTF() + port = in.readInt() + } + + override def toString = "BlockManagerId(" + ip + ", " + port + ")" + + override def hashCode = ip.hashCode * 41 + port + + override def equals(that: Any) = that match { + case id: BlockManagerId => port == id.port && ip == id.ip + case _ => false + } +} \ No newline at end of file diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 0a4e68f437..64cdb86f8d 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -17,95 +17,24 @@ import spark.{Logging, SparkException, Utils} private[spark] -sealed trait ToBlockManagerMaster +case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) -private[spark] -case class RegisterBlockManager( - blockManagerId: BlockManagerId, - maxMemSize: Long) - extends ToBlockManagerMaster - -private[spark] -case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster - -private[spark] -class BlockUpdate( - var blockManagerId: BlockManagerId, - var blockId: String, - var storageLevel: StorageLevel, - var memSize: Long, - var diskSize: Long) - extends ToBlockManagerMaster - with Externalizable { - - def this() = this(null, null, null, 0, 0) // For deserialization only - - override def writeExternal(out: ObjectOutput) { - blockManagerId.writeExternal(out) - out.writeUTF(blockId) - storageLevel.writeExternal(out) - out.writeInt(memSize.toInt) - out.writeInt(diskSize.toInt) - } - - override def readExternal(in: ObjectInput) { - blockManagerId = new BlockManagerId() - blockManagerId.readExternal(in) - blockId = in.readUTF() - storageLevel = new StorageLevel() - storageLevel.readExternal(in) - memSize = in.readInt() - diskSize = in.readInt() - } -} - -private[spark] -object BlockUpdate { - def apply(blockManagerId: BlockManagerId, - blockId: String, - storageLevel: StorageLevel, - memSize: Long, - diskSize: Long): BlockUpdate = { - new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize) - } - - // For pattern-matching - def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) - } -} - -private[spark] -case class GetLocations(blockId: String) extends ToBlockManagerMaster - -private[spark] -case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster - -private[spark] -case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster - -private[spark] -case class RemoveHost(host: String) extends ToBlockManagerMaster - -private[spark] -case object StopBlockManagerMaster extends ToBlockManagerMaster - -private[spark] -case object GetMemoryStatus extends ToBlockManagerMaster +// TODO(rxin): Move BlockManagerMasterActor to its own file. private[spark] -case object ExpireDeadHosts extends ToBlockManagerMaster - - -private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { +class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, - val maxMem: Long) { - private var _lastSeenMs = timeMs - private var _remainingMem = maxMem - private val _blocks = new JHashMap[String, StorageLevel] + val maxMem: Long, + val slaveActor: ActorRef) { + + private var _lastSeenMs: Long = timeMs + private var _remainingMem: Long = maxMem + + // Mapping from block id to its status. + private val _blocks = new JHashMap[String, BlockStatus] logInfo("Registering block manager %s:%d with %s RAM".format( blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) @@ -121,7 +50,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor if (_blocks.containsKey(blockId)) { // The block exists on the slave already. - val originalLevel: StorageLevel = _blocks.get(blockId) + val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel if (originalLevel.useMemory) { _remainingMem += memSize @@ -130,7 +59,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor if (storageLevel.isValid) { // isValid means it is either stored in-memory or on-disk. - _blocks.put(blockId, storageLevel) + _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) if (storageLevel.useMemory) { _remainingMem -= memSize logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format( @@ -143,15 +72,15 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } } else if (_blocks.containsKey(blockId)) { // If isValid is not true, drop the block. - val originalLevel: StorageLevel = _blocks.get(blockId) + val blockStatus: BlockStatus = _blocks.get(blockId) _blocks.remove(blockId) - if (originalLevel.useMemory) { - _remainingMem += memSize + if (blockStatus.storageLevel.useMemory) { + _remainingMem += blockStatus.memSize logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format( blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize), Utils.memoryBytesToString(_remainingMem))) } - if (originalLevel.useDisk) { + if (blockStatus.storageLevel.useDisk) { logInfo("Removed %s on %s:%d on disk (size: %s)".format( blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize))) } @@ -162,7 +91,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor def lastSeenMs: Long = _lastSeenMs - def blocks: JHashMap[String, StorageLevel] = _blocks + def blocks: JHashMap[String, BlockStatus] = _blocks override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem @@ -171,8 +100,13 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } } + // Mapping from block manager id to the block manager's information. private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo] + + // Mapping from host name to block manager id. private val blockManagerIdByHost = new HashMap[String, BlockManagerId] + + // Mapping from block id to the set of block managers that have the block. private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] initLogging() @@ -245,8 +179,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } def receive = { - case RegisterBlockManager(blockManagerId, maxMemSize) => - register(blockManagerId, maxMemSize) + case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => + register(blockManagerId, maxMemSize, slaveActor) case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) => blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) @@ -264,6 +198,9 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor case GetMemoryStatus => getMemoryStatus + case RemoveBlock(blockId) => + removeBlock(blockId) + case RemoveHost(host) => removeHost(host) sender ! true @@ -286,6 +223,27 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor logInfo("Got unknown message: " + other) } + // Remove a block from the slaves that have it. This can only be used to remove + // blocks that the master knows about. + private def removeBlock(blockId: String) { + val block = blockInfo.get(blockId) + if (block != null) { + block._2.foreach { blockManagerId: BlockManagerId => + val blockManager = blockManagerInfo.get(blockManagerId) + if (blockManager.isDefined) { + // Remove the block from the slave's BlockManager. + // Doesn't actually wait for a confirmation and the message might get lost. + // If message loss becomes frequent, we should add retry logic here. + blockManager.get.slaveActor ! RemoveBlock(blockId) + // Remove the block from the master's BlockManagerInfo. + blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0) + } + } + blockInfo.remove(blockId) + } + sender ! true + } + // Return a map from the block manager id to max memory and remaining memory. private def getMemoryStatus() { val res = blockManagerInfo.map { case(blockManagerId, info) => @@ -294,7 +252,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor sender ! res } - private def register(blockManagerId: BlockManagerId, maxMemSize: Long) { + private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) @@ -309,7 +267,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor logInfo("Got Register Msg from master node, don't register it") } else { blockManagerInfo += (blockManagerId -> new BlockManagerInfo( - blockManagerId, System.currentTimeMillis(), maxMemSize)) + blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor)) } blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) @@ -442,25 +400,29 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } } -private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean) + +private[spark] class BlockManagerMaster( + val actorSystem: ActorSystem, + isMaster: Boolean, + isLocal: Boolean, + masterIp: String, + masterPort: Int) extends Logging { - val AKKA_ACTOR_NAME: String = "BlockMasterManager" + val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager" + val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager" val REQUEST_RETRY_INTERVAL_MS = 100 - val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost") - val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt val DEFAULT_MANAGER_IP: String = Utils.localHostName() val timeout = 10.seconds var masterActor: ActorRef = null if (isMaster) { - masterActor = actorSystem.actorOf( - Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME) + masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)), + name = MASTER_AKKA_ACTOR_NAME) logInfo("Registered BlockManagerMaster Actor") } else { - val url = "akka://spark@%s:%s/user/%s".format( - DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME) + val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME) logInfo("Connecting to BlockManagerMaster: " + url) masterActor = actorSystem.actorFor(url) } @@ -497,7 +459,9 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool logInfo("Removed " + host + " successfully in notifyADeadHost") } - def mustRegisterBlockManager(msg: RegisterBlockManager) { + def mustRegisterBlockManager( + blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + val msg = RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) logInfo("Trying to register BlockManager") while (! syncRegisterBlockManager(msg)) { logWarning("Failed to register " + msg) @@ -506,7 +470,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool logInfo("Done registering BlockManager") } - def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = { + private def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = { //val masterActor = RemoteActor.select(node, name) val startTimeMs = System.currentTimeMillis() val tmp = " msg " + msg + " " @@ -533,7 +497,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return res.get } - def syncHeartBeat(msg: HeartBeat): Option[Boolean] = { + private def syncHeartBeat(msg: HeartBeat): Option[Boolean] = { try { val answer = askMaster(msg).asInstanceOf[Boolean] return Some(answer) @@ -553,7 +517,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return res.get } - def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = { + private def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = { val startTimeMs = System.currentTimeMillis() val tmp = " msg " + msg + " " logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs)) @@ -580,7 +544,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return res } - def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = { + private def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = { val startTimeMs = System.currentTimeMillis() val tmp = " msg " + msg + " " logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) @@ -603,7 +567,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool } def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): - Seq[Seq[BlockManagerId]] = { + Seq[Seq[BlockManagerId]] = { var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg) while (res == null) { logWarning("Failed to GetLocationsMultipleBlockIds " + msg) @@ -613,7 +577,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool return res } - def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): + private def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds): Seq[Seq[BlockManagerId]] = { val startTimeMs = System.currentTimeMillis val tmp = " msg " + msg + " " @@ -644,11 +608,10 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool Thread.sleep(REQUEST_RETRY_INTERVAL_MS) res = syncGetPeers(msg) } - - return res + res } - def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = { + private def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = { val startTimeMs = System.currentTimeMillis val tmp = " msg " + msg + " " logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs)) @@ -670,6 +633,20 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool } } + /** + * Remove a block from the slaves that have it. This can only be used to remove + * blocks that the master knows about. + */ + def removeBlock(blockId: String) { + askMaster(RemoveBlock(blockId)) + } + + /** + * Return the memory status for each block manager, in the form of a map from + * the block manager's id to two long values. The first value is the maximum + * amount of memory allocated for the block manager, while the second is the + * amount of remaining memory. + */ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]] } diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala new file mode 100644 index 0000000000..5bca170f95 --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -0,0 +1,102 @@ +package spark.storage + +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +import akka.actor.ActorRef + + +////////////////////////////////////////////////////////////////////////////////// +// Messages from the master to slaves. +////////////////////////////////////////////////////////////////////////////////// +private[spark] +sealed trait ToBlockManagerSlave + +// Remove a block from the slaves that have it. This can only be used to remove +// blocks that the master knows about. +private[spark] +case class RemoveBlock(blockId: String) extends ToBlockManagerSlave + + +////////////////////////////////////////////////////////////////////////////////// +// Messages from slaves to the master. +////////////////////////////////////////////////////////////////////////////////// +private[spark] +sealed trait ToBlockManagerMaster + +private[spark] +case class RegisterBlockManager( + blockManagerId: BlockManagerId, + maxMemSize: Long, + sender: ActorRef) + extends ToBlockManagerMaster + +private[spark] +case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + +private[spark] +class BlockUpdate( + var blockManagerId: BlockManagerId, + var blockId: String, + var storageLevel: StorageLevel, + var memSize: Long, + var diskSize: Long) + extends ToBlockManagerMaster + with Externalizable { + + def this() = this(null, null, null, 0, 0) // For deserialization only + + override def writeExternal(out: ObjectOutput) { + blockManagerId.writeExternal(out) + out.writeUTF(blockId) + storageLevel.writeExternal(out) + out.writeInt(memSize.toInt) + out.writeInt(diskSize.toInt) + } + + override def readExternal(in: ObjectInput) { + blockManagerId = new BlockManagerId() + blockManagerId.readExternal(in) + blockId = in.readUTF() + storageLevel = new StorageLevel() + storageLevel.readExternal(in) + memSize = in.readInt() + diskSize = in.readInt() + } +} + +private[spark] +object BlockUpdate { + def apply(blockManagerId: BlockManagerId, + blockId: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long): BlockUpdate = { + new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize) + } + + // For pattern-matching + def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) + } +} + +private[spark] +case class GetLocations(blockId: String) extends ToBlockManagerMaster + +private[spark] +case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster + +private[spark] +case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster + +private[spark] +case class RemoveHost(host: String) extends ToBlockManagerMaster + +private[spark] +case object StopBlockManagerMaster extends ToBlockManagerMaster + +private[spark] +case object GetMemoryStatus extends ToBlockManagerMaster + +private[spark] +case object ExpireDeadHosts extends ToBlockManagerMaster diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala new file mode 100644 index 0000000000..f570cdc52d --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala @@ -0,0 +1,16 @@ +package spark.storage + +import akka.actor.Actor + +import spark.{Logging, SparkException, Utils} + + +/** + * An actor to take commands from the master to execute options. For example, + * this is used to remove blocks from the slave's BlockManager. + */ +class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor { + override def receive = { + case RemoveBlock(blockId) => blockManager.removeBlock(blockId) + } +} diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala index 5bb5a29cc4..689f07b969 100644 --- a/core/src/main/scala/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -58,8 +58,10 @@ private[spark] object ThreadingTest { val startTime = System.currentTimeMillis() manager.get(blockId) match { case Some(retrievedBlock) => - assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match") - println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") + assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, + "Block " + blockId + " did not match") + println("Got block " + blockId + " in " + + (System.currentTimeMillis - startTime) + " ms") case None => assert(false, "Block " + blockId + " could not be retrieved") } @@ -73,7 +75,9 @@ private[spark] object ThreadingTest { System.setProperty("spark.kryoserializer.buffer.mb", "1") val actorSystem = ActorSystem("test") val serializer = new KryoSerializer - val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true) + val masterIp: String = System.getProperty("spark.master.host", "localhost") + val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt + val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort) val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) @@ -86,6 +90,7 @@ private[spark] object ThreadingTest { actorSystem.shutdown() actorSystem.awaitTermination() println("Everything stopped.") - println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") + println( + "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") } } diff --git a/core/src/main/scala/spark/util/GenerationIdUtil.scala b/core/src/main/scala/spark/util/GenerationIdUtil.scala new file mode 100644 index 0000000000..8a17b700b0 --- /dev/null +++ b/core/src/main/scala/spark/util/GenerationIdUtil.scala @@ -0,0 +1,19 @@ +package spark.util + +import java.util.concurrent.atomic.AtomicInteger + +private[spark] +object GenerationIdUtil { + + val BLOCK_MANAGER = new IdGenerator + + /** + * A util used to get a unique generation ID. This is a wrapper around + * Java's AtomicInteger. + */ + class IdGenerator { + private var id = new AtomicInteger + + def next: Int = id.incrementAndGet + } +} diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index ad2253596d..4dc3b7ec05 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -20,15 +20,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT var oldArch: String = null var oldOops: String = null var oldHeartBeat: String = null - - // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test + + // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer before { actorSystem = ActorSystem("test") - master = new BlockManagerMaster(actorSystem, true, true) + master = new BlockManagerMaster(actorSystem, true, true, "localhost", 7077) - // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") oldOops = System.setProperty("spark.test.useCompressedOops", "true") oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true") @@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false) - // Checking whether blocks are in memory + // Checking whether blocks are in memory assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store") @@ -83,7 +83,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") - + // Drop a1 and a2 from memory; this should be reported back to the master store.dropFromMemory("a1", null) store.dropFromMemory("a2", null) @@ -93,6 +93,45 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") } + test("removing block") { + store = new BlockManager(actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + + // Putting a1, a2 and a3 in memory and telling master only about a1 and a2 + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false) + + // Checking whether blocks are in memory and memory size + var memStatus = master.getMemoryStatus.head._2 + assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") + assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200") + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(store.getSingle("a2") != None, "a2 was not in store") + assert(store.getSingle("a3") != None, "a3 was not in store") + + // Checking whether master knows about the blocks or not + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") + assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") + + // Remove a1 and a2 and a3. Should be no-op for a3. + master.removeBlock("a1") + master.removeBlock("a2") + master.removeBlock("a3") + assert(store.getSingle("a1") === None, "a1 not removed from store") + assert(store.getSingle("a2") === None, "a2 not removed from store") + assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") + assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") + assert(store.getSingle("a3") != None, "a3 was not in store") + assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") + memStatus = master.getMemoryStatus.head._2 + assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000") + assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000") + } + test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) store = new BlockManager(actorSystem, master, serializer, 2000) @@ -122,7 +161,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT master.notifyADeadHost(store.blockManagerId.ip) assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") - + store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY) assert(master.mustGetLocations(GetLocations("a1")).size > 0, @@ -145,11 +184,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") store invokePrivate heartBeat() - + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") store2 invokePrivate heartBeat() - + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master") } @@ -171,7 +210,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a3") === None, "a3 was in store") } - + test("in-memory LRU storage with serialization") { store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) -- cgit v1.2.3