aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-13 00:04:42 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-13 00:04:42 -0800
commit1b7a0451ed7df78838ca7ea09dfa5ba0e236acfe (patch)
treef1f06dcb7518833aee0fdfbd3059206a939eced9
parent21b271f5bdfca63a9925c578c8e53bee1890adeb (diff)
downloadspark-1b7a0451ed7df78838ca7ea09dfa5ba0e236acfe.tar.gz
spark-1b7a0451ed7df78838ca7ea09dfa5ba0e236acfe.tar.bz2
spark-1b7a0451ed7df78838ca7ea09dfa5ba0e236acfe.zip
Added the ability in block manager to remove blocks.
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala83
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala29
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala199
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala102
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala16
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala13
-rw-r--r--core/src/main/scala/spark/util/GenerationIdUtil.scala19
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala59
9 files changed, 361 insertions, 170 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 272d7cdad3..41441720a7 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -86,10 +86,13 @@ object SparkEnv extends Logging {
}
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
-
- val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
+
+ val masterIp: String = System.getProperty("spark.master.host", "localhost")
+ val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(
+ actorSystem, isMaster, isLocal, masterIp, masterPort)
val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
-
+
val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isMaster)
@@ -104,7 +107,7 @@ object SparkEnv extends Logging {
val shuffleFetcher = instantiateClass[ShuffleFetcher](
"spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
-
+
val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index df295b1820..b2c9e2cc40 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,59 +1,39 @@
package spark.storage
-import akka.actor.{ActorSystem, Cancellable}
+import java.io.{InputStream, OutputStream}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+import scala.collection.JavaConversions._
+
+import akka.actor.{ActorSystem, Cancellable, Props}
import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import java.io.{InputStream, OutputStream, Externalizable, ObjectInput, ObjectOutput}
-import java.nio.{MappedByteBuffer, ByteBuffer}
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
-import scala.collection.JavaConversions._
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
-import spark.util.ByteBufferInputStream
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import sun.nio.ch.DirectBuffer
-
-
-private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
- def this() = this(null, 0) // For deserialization only
-
- def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
+import spark.util.{ByteBufferInputStream, GenerationIdUtil}
- override def writeExternal(out: ObjectOutput) {
- out.writeUTF(ip)
- out.writeInt(port)
- }
-
- override def readExternal(in: ObjectInput) {
- ip = in.readUTF()
- port = in.readInt()
- }
-
- override def toString = "BlockManagerId(" + ip + ", " + port + ")"
-
- override def hashCode = ip.hashCode * 41 + port
+import sun.nio.ch.DirectBuffer
- override def equals(that: Any) = that match {
- case id: BlockManagerId => port == id.port && ip == id.ip
- case _ => false
- }
-}
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark]
-class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
- val serializer: Serializer, maxMemory: Long)
+class BlockManager(
+ actorSystem: ActorSystem,
+ val master: BlockManagerMaster,
+ val serializer: Serializer,
+ maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -110,6 +90,9 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ val slaveActor = master.actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
+ name = "BlockManagerActor" + GenerationIdUtil.BLOCK_MANAGER.next)
+
@volatile private var shuttingDown = false
private def heartBeat() {
@@ -134,8 +117,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
* BlockManagerWorker actor.
*/
private def initialize() {
- master.mustRegisterBlockManager(
- RegisterBlockManager(blockManagerId, maxMemory))
+ master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
@@ -171,8 +153,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
def reregister() {
// TODO: We might need to rate limit reregistering.
logInfo("BlockManager reregistering with master")
- master.mustRegisterBlockManager(
- RegisterBlockManager(blockManagerId, maxMemory))
+ master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
reportAllBlocks()
}
@@ -865,6 +846,25 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
}
+ /**
+ * Remove a block from both memory and disk. This one doesn't report to the master
+ * because it expects the master to initiate the original block removal command, and
+ * then the master can update the block tracking itself.
+ */
+ def removeBlock(blockId: String) {
+ logInfo("Removing block " + blockId)
+ val info = blockInfo.get(blockId)
+ if (info != null) info.synchronized {
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ memoryStore.remove(blockId)
+ diskStore.remove(blockId)
+ blockInfo.remove(blockId)
+ } else {
+ // The block has already been removed; do nothing.
+ logWarning("Block " + blockId + " does not exist.")
+ }
+ }
+
def shouldCompress(blockId: String): Boolean = {
if (blockId.startsWith("shuffle_")) {
compressShuffle
@@ -914,6 +914,7 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
heartBeatTask.cancel()
}
connectionManager.stop()
+ master.actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
new file mode 100644
index 0000000000..03cd141805
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -0,0 +1,29 @@
+package spark.storage
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+
+private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
+ def this() = this(null, 0) // For deserialization only
+
+ def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
+
+ override def writeExternal(out: ObjectOutput) {
+ out.writeUTF(ip)
+ out.writeInt(port)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ ip = in.readUTF()
+ port = in.readInt()
+ }
+
+ override def toString = "BlockManagerId(" + ip + ", " + port + ")"
+
+ override def hashCode = ip.hashCode * 41 + port
+
+ override def equals(that: Any) = that match {
+ case id: BlockManagerId => port == id.port && ip == id.ip
+ case _ => false
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 0a4e68f437..64cdb86f8d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -17,95 +17,24 @@ import spark.{Logging, SparkException, Utils}
private[spark]
-sealed trait ToBlockManagerMaster
+case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
-private[spark]
-case class RegisterBlockManager(
- blockManagerId: BlockManagerId,
- maxMemSize: Long)
- extends ToBlockManagerMaster
-
-private[spark]
-case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-private[spark]
-class BlockUpdate(
- var blockManagerId: BlockManagerId,
- var blockId: String,
- var storageLevel: StorageLevel,
- var memSize: Long,
- var diskSize: Long)
- extends ToBlockManagerMaster
- with Externalizable {
-
- def this() = this(null, null, null, 0, 0) // For deserialization only
-
- override def writeExternal(out: ObjectOutput) {
- blockManagerId.writeExternal(out)
- out.writeUTF(blockId)
- storageLevel.writeExternal(out)
- out.writeInt(memSize.toInt)
- out.writeInt(diskSize.toInt)
- }
-
- override def readExternal(in: ObjectInput) {
- blockManagerId = new BlockManagerId()
- blockManagerId.readExternal(in)
- blockId = in.readUTF()
- storageLevel = new StorageLevel()
- storageLevel.readExternal(in)
- memSize = in.readInt()
- diskSize = in.readInt()
- }
-}
-
-private[spark]
-object BlockUpdate {
- def apply(blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): BlockUpdate = {
- new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
- }
-
- // For pattern-matching
- def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
- }
-}
-
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
-
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-
-private[spark]
-case class RemoveHost(host: String) extends ToBlockManagerMaster
-
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
-
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
+// TODO(rxin): Move BlockManagerMasterActor to its own file.
private[spark]
-case object ExpireDeadHosts extends ToBlockManagerMaster
-
-
-private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
- val maxMem: Long) {
- private var _lastSeenMs = timeMs
- private var _remainingMem = maxMem
- private val _blocks = new JHashMap[String, StorageLevel]
+ val maxMem: Long,
+ val slaveActor: ActorRef) {
+
+ private var _lastSeenMs: Long = timeMs
+ private var _remainingMem: Long = maxMem
+
+ // Mapping from block id to its status.
+ private val _blocks = new JHashMap[String, BlockStatus]
logInfo("Registering block manager %s:%d with %s RAM".format(
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
@@ -121,7 +50,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
- val originalLevel: StorageLevel = _blocks.get(blockId)
+ val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
if (originalLevel.useMemory) {
_remainingMem += memSize
@@ -130,7 +59,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if (storageLevel.isValid) {
// isValid means it is either stored in-memory or on-disk.
- _blocks.put(blockId, storageLevel)
+ _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
if (storageLevel.useMemory) {
_remainingMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
@@ -143,15 +72,15 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
- val originalLevel: StorageLevel = _blocks.get(blockId)
+ val blockStatus: BlockStatus = _blocks.get(blockId)
_blocks.remove(blockId)
- if (originalLevel.useMemory) {
- _remainingMem += memSize
+ if (blockStatus.storageLevel.useMemory) {
+ _remainingMem += blockStatus.memSize
logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
Utils.memoryBytesToString(_remainingMem)))
}
- if (originalLevel.useDisk) {
+ if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s:%d on disk (size: %s)".format(
blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
}
@@ -162,7 +91,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
def lastSeenMs: Long = _lastSeenMs
- def blocks: JHashMap[String, StorageLevel] = _blocks
+ def blocks: JHashMap[String, BlockStatus] = _blocks
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
@@ -171,8 +100,13 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
+ // Mapping from block manager id to the block manager's information.
private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
+
+ // Mapping from host name to block manager id.
private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
+
+ // Mapping from block id to the set of block managers that have the block.
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
@@ -245,8 +179,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
def receive = {
- case RegisterBlockManager(blockManagerId, maxMemSize) =>
- register(blockManagerId, maxMemSize)
+ case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+ register(blockManagerId, maxMemSize, slaveActor)
case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
@@ -264,6 +198,9 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case GetMemoryStatus =>
getMemoryStatus
+ case RemoveBlock(blockId) =>
+ removeBlock(blockId)
+
case RemoveHost(host) =>
removeHost(host)
sender ! true
@@ -286,6 +223,27 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
logInfo("Got unknown message: " + other)
}
+ // Remove a block from the slaves that have it. This can only be used to remove
+ // blocks that the master knows about.
+ private def removeBlock(blockId: String) {
+ val block = blockInfo.get(blockId)
+ if (block != null) {
+ block._2.foreach { blockManagerId: BlockManagerId =>
+ val blockManager = blockManagerInfo.get(blockManagerId)
+ if (blockManager.isDefined) {
+ // Remove the block from the slave's BlockManager.
+ // Doesn't actually wait for a confirmation and the message might get lost.
+ // If message loss becomes frequent, we should add retry logic here.
+ blockManager.get.slaveActor ! RemoveBlock(blockId)
+ // Remove the block from the master's BlockManagerInfo.
+ blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0)
+ }
+ }
+ blockInfo.remove(blockId)
+ }
+ sender ! true
+ }
+
// Return a map from the block manager id to max memory and remaining memory.
private def getMemoryStatus() {
val res = blockManagerInfo.map { case(blockManagerId, info) =>
@@ -294,7 +252,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! res
}
- private def register(blockManagerId: BlockManagerId, maxMemSize: Long) {
+ private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -309,7 +267,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
- blockManagerId, System.currentTimeMillis(), maxMemSize))
+ blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
}
blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -442,25 +400,29 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
-private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean)
+
+private[spark] class BlockManagerMaster(
+ val actorSystem: ActorSystem,
+ isMaster: Boolean,
+ isLocal: Boolean,
+ masterIp: String,
+ masterPort: Int)
extends Logging {
- val AKKA_ACTOR_NAME: String = "BlockMasterManager"
+ val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
+ val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
val REQUEST_RETRY_INTERVAL_MS = 100
- val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
- val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
var masterActor: ActorRef = null
if (isMaster) {
- masterActor = actorSystem.actorOf(
- Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME)
+ masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+ name = MASTER_AKKA_ACTOR_NAME)
logInfo("Registered BlockManagerMaster Actor")
} else {
- val url = "akka://spark@%s:%s/user/%s".format(
- DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
+ val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
logInfo("Connecting to BlockManagerMaster: " + url)
masterActor = actorSystem.actorFor(url)
}
@@ -497,7 +459,9 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
logInfo("Removed " + host + " successfully in notifyADeadHost")
}
- def mustRegisterBlockManager(msg: RegisterBlockManager) {
+ def mustRegisterBlockManager(
+ blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ val msg = RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)
logInfo("Trying to register BlockManager")
while (! syncRegisterBlockManager(msg)) {
logWarning("Failed to register " + msg)
@@ -506,7 +470,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
logInfo("Done registering BlockManager")
}
- def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
+ private def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
//val masterActor = RemoteActor.select(node, name)
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
@@ -533,7 +497,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return res.get
}
- def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
+ private def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
try {
val answer = askMaster(msg).asInstanceOf[Boolean]
return Some(answer)
@@ -553,7 +517,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return res.get
}
- def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
+ private def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
@@ -580,7 +544,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return res
}
- def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
+ private def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -603,7 +567,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
- Seq[Seq[BlockManagerId]] = {
+ Seq[Seq[BlockManagerId]] = {
var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
while (res == null) {
logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
@@ -613,7 +577,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return res
}
- def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
+ private def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
Seq[Seq[BlockManagerId]] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
@@ -644,11 +608,10 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
res = syncGetPeers(msg)
}
-
- return res
+ res
}
- def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
+ private def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
val startTimeMs = System.currentTimeMillis
val tmp = " msg " + msg + " "
logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
@@ -670,6 +633,20 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
}
+ /**
+ * Remove a block from the slaves that have it. This can only be used to remove
+ * blocks that the master knows about.
+ */
+ def removeBlock(blockId: String) {
+ askMaster(RemoveBlock(blockId))
+ }
+
+ /**
+ * Return the memory status for each block manager, in the form of a map from
+ * the block manager's id to two long values. The first value is the maximum
+ * amount of memory allocated for the block manager, while the second is the
+ * amount of remaining memory.
+ */
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
new file mode 100644
index 0000000000..5bca170f95
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -0,0 +1,102 @@
+package spark.storage
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import akka.actor.ActorRef
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from the master to slaves.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerSlave
+
+// Remove a block from the slaves that have it. This can only be used to remove
+// blocks that the master knows about.
+private[spark]
+case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+
+//////////////////////////////////////////////////////////////////////////////////
+// Messages from slaves to the master.
+//////////////////////////////////////////////////////////////////////////////////
+private[spark]
+sealed trait ToBlockManagerMaster
+
+private[spark]
+case class RegisterBlockManager(
+ blockManagerId: BlockManagerId,
+ maxMemSize: Long,
+ sender: ActorRef)
+ extends ToBlockManagerMaster
+
+private[spark]
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class BlockUpdate(
+ var blockManagerId: BlockManagerId,
+ var blockId: String,
+ var storageLevel: StorageLevel,
+ var memSize: Long,
+ var diskSize: Long)
+ extends ToBlockManagerMaster
+ with Externalizable {
+
+ def this() = this(null, null, null, 0, 0) // For deserialization only
+
+ override def writeExternal(out: ObjectOutput) {
+ blockManagerId.writeExternal(out)
+ out.writeUTF(blockId)
+ storageLevel.writeExternal(out)
+ out.writeInt(memSize.toInt)
+ out.writeInt(diskSize.toInt)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ blockManagerId = new BlockManagerId()
+ blockManagerId.readExternal(in)
+ blockId = in.readUTF()
+ storageLevel = new StorageLevel()
+ storageLevel.readExternal(in)
+ memSize = in.readInt()
+ diskSize = in.readInt()
+ }
+}
+
+private[spark]
+object BlockUpdate {
+ def apply(blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long): BlockUpdate = {
+ new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ }
+
+ // For pattern-matching
+ def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+ }
+}
+
+private[spark]
+case class GetLocations(blockId: String) extends ToBlockManagerMaster
+
+private[spark]
+case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+
+private[spark]
+case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+
+private[spark]
+case class RemoveHost(host: String) extends ToBlockManagerMaster
+
+private[spark]
+case object StopBlockManagerMaster extends ToBlockManagerMaster
+
+private[spark]
+case object GetMemoryStatus extends ToBlockManagerMaster
+
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
new file mode 100644
index 0000000000..f570cdc52d
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -0,0 +1,16 @@
+package spark.storage
+
+import akka.actor.Actor
+
+import spark.{Logging, SparkException, Utils}
+
+
+/**
+ * An actor to take commands from the master to execute options. For example,
+ * this is used to remove blocks from the slave's BlockManager.
+ */
+class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
+ override def receive = {
+ case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+ }
+}
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index 5bb5a29cc4..689f07b969 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -58,8 +58,10 @@ private[spark] object ThreadingTest {
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
- assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
- println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
+ assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+ "Block " + blockId + " did not match")
+ println("Got block " + blockId + " in " +
+ (System.currentTimeMillis - startTime) + " ms")
case None =>
assert(false, "Block " + blockId + " could not be retrieved")
}
@@ -73,7 +75,9 @@ private[spark] object ThreadingTest {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
- val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
+ val masterIp: String = System.getProperty("spark.master.host", "localhost")
+ val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
+ val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true, masterIp, masterPort)
val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
@@ -86,6 +90,7 @@ private[spark] object ThreadingTest {
actorSystem.shutdown()
actorSystem.awaitTermination()
println("Everything stopped.")
- println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
+ println(
+ "It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
}
}
diff --git a/core/src/main/scala/spark/util/GenerationIdUtil.scala b/core/src/main/scala/spark/util/GenerationIdUtil.scala
new file mode 100644
index 0000000000..8a17b700b0
--- /dev/null
+++ b/core/src/main/scala/spark/util/GenerationIdUtil.scala
@@ -0,0 +1,19 @@
+package spark.util
+
+import java.util.concurrent.atomic.AtomicInteger
+
+private[spark]
+object GenerationIdUtil {
+
+ val BLOCK_MANAGER = new IdGenerator
+
+ /**
+ * A util used to get a unique generation ID. This is a wrapper around
+ * Java's AtomicInteger.
+ */
+ class IdGenerator {
+ private var id = new AtomicInteger
+
+ def next: Int = id.incrementAndGet
+ }
+}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index ad2253596d..4dc3b7ec05 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -20,15 +20,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldArch: String = null
var oldOops: String = null
var oldHeartBeat: String = null
-
- // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer
before {
actorSystem = ActorSystem("test")
- master = new BlockManagerMaster(actorSystem, true, true)
+ master = new BlockManagerMaster(actorSystem, true, true, "localhost", 7077)
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+ // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
@@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
- // Checking whether blocks are in memory
+ // Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
@@ -83,7 +83,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
-
+
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
@@ -93,6 +93,45 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
}
+ test("removing block") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+
+ // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
+
+ // Checking whether blocks are in memory and memory size
+ var memStatus = master.getMemoryStatus.head._2
+ assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+ assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+
+ // Checking whether master knows about the blocks or not
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
+ assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+
+ // Remove a1 and a2 and a3. Should be no-op for a3.
+ master.removeBlock("a1")
+ master.removeBlock("a2")
+ master.removeBlock("a3")
+ assert(store.getSingle("a1") === None, "a1 not removed from store")
+ assert(store.getSingle("a2") === None, "a2 not removed from store")
+ assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
+ assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+ memStatus = master.getMemoryStatus.head._2
+ assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+ assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000")
+ }
+
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
store = new BlockManager(actorSystem, master, serializer, 2000)
@@ -122,7 +161,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.notifyADeadHost(store.blockManagerId.ip)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
-
+
store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0,
@@ -145,11 +184,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
-
+
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
store2 invokePrivate heartBeat()
-
+
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
}
@@ -171,7 +210,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
}
-
+
test("in-memory LRU storage with serialization") {
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)