aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-13 22:32:19 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-13 22:32:19 -0800
commit97434f49b8c029e9b78c91ec5f58557cd1b5c943 (patch)
tree5d2e9975ecc693e3f2a36b1066361fe1d99ee0af /core
parent41e58a519ac107c118715b8b9701032dd3d9a2d2 (diff)
downloadspark-97434f49b8c029e9b78c91ec5f58557cd1b5c943.tar.gz
spark-97434f49b8c029e9b78c91ec5f58557cd1b5c943.tar.bz2
spark-97434f49b8c029e9b78c91ec5f58557cd1b5c943.zip
Merged TD's block manager refactoring.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala66
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala23
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala702
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala406
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala10
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala32
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala35
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala87
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala91
9 files changed, 805 insertions, 647 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index b2c9e2cc40..2f41633440 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -19,7 +19,7 @@ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
-import spark.util.{ByteBufferInputStream, GenerationIdUtil}
+import spark.util.{ByteBufferInputStream, GenerationIdUtil, MetadataCleaner, TimeStampedHashMap}
import sun.nio.ch.DirectBuffer
@@ -59,7 +59,7 @@ class BlockManager(
}
}
- private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
+ private val blockInfo = new TimeStampedHashMap[String, BlockInfo]()
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -96,13 +96,14 @@ class BlockManager(
@volatile private var shuttingDown = false
private def heartBeat() {
- if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+ if (!master.sendHeartBeat(blockManagerId)) {
reregister()
}
}
var heartBeatTask: Cancellable = null
+ val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
/**
@@ -117,7 +118,7 @@ class BlockManager(
* BlockManagerWorker actor.
*/
private def initialize() {
- master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
+ master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
@@ -153,17 +154,14 @@ class BlockManager(
def reregister() {
// TODO: We might need to rate limit reregistering.
logInfo("BlockManager reregistering with master")
- master.mustRegisterBlockManager(blockManagerId, maxMemory, slaveActor)
+ master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
reportAllBlocks()
}
/**
* Get storage level of local block. If no info exists for the block, then returns null.
*/
- def getLevel(blockId: String): StorageLevel = {
- val info = blockInfo.get(blockId)
- if (info != null) info.level else null
- }
+ def getLevel(blockId: String): StorageLevel = blockInfo.get(blockId).map(_.level).orNull
/**
* Tell the master about the current storage status of a block. This will send a block update
@@ -186,9 +184,9 @@ class BlockManager(
*/
private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match {
- case null =>
+ case None =>
(StorageLevel.NONE, 0L, 0L, false)
- case info =>
+ case Some(info) =>
info.synchronized {
info.level match {
case null =>
@@ -207,7 +205,7 @@ class BlockManager(
}
if (tellMaster) {
- master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+ master.updateBlockInfo(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)
} else {
true
}
@@ -219,7 +217,7 @@ class BlockManager(
*/
def getLocations(blockId: String): Seq[String] = {
val startTimeMs = System.currentTimeMillis
- var managers = master.mustGetLocations(GetLocations(blockId))
+ var managers = master.getLocations(blockId)
val locations = managers.map(_.ip)
logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs))
return locations
@@ -230,8 +228,7 @@ class BlockManager(
*/
def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis
- val locations = master.mustGetLocationsMultipleBlockIds(
- GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
+ val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray
logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
}
@@ -253,7 +250,7 @@ class BlockManager(
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -338,7 +335,7 @@ class BlockManager(
}
}
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
@@ -394,7 +391,7 @@ class BlockManager(
}
logDebug("Getting remote block " + blockId)
// Get locations of block
- val locations = master.mustGetLocations(GetLocations(blockId))
+ val locations = master.getLocations(blockId)
// Get block from remote locations
for (loc <- locations) {
@@ -596,7 +593,7 @@ class BlockManager(
throw new IllegalArgumentException("Storage level is null or invalid")
}
- val oldBlock = blockInfo.get(blockId)
+ val oldBlock = blockInfo.get(blockId).orNull
if (oldBlock != null) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
oldBlock.waitForReady()
@@ -697,7 +694,7 @@ class BlockManager(
throw new IllegalArgumentException("Storage level is null or invalid")
}
- if (blockInfo.containsKey(blockId)) {
+ if (blockInfo.contains(blockId)) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
return
}
@@ -772,7 +769,7 @@ class BlockManager(
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
if (cachedPeers == null) {
- cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
@@ -819,7 +816,7 @@ class BlockManager(
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
val level = info.level
@@ -853,7 +850,7 @@ class BlockManager(
*/
def removeBlock(blockId: String) {
logInfo("Removing block " + blockId)
- val info = blockInfo.get(blockId)
+ val info = blockInfo.get(blockId).orNull
if (info != null) info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
memoryStore.remove(blockId)
@@ -865,6 +862,29 @@ class BlockManager(
}
}
+ def dropOldBlocks(cleanupTime: Long) {
+ logInfo("Dropping blocks older than " + cleanupTime)
+ val iterator = blockInfo.internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
+ if (time < cleanupTime) {
+ info.synchronized {
+ val level = info.level
+ if (level.useMemory) {
+ memoryStore.remove(id)
+ }
+ if (level.useDisk) {
+ diskStore.remove(id)
+ }
+ iterator.remove()
+ logInfo("Dropped block " + id)
+ }
+ reportBlockStatus(id)
+ }
+ }
+ }
+
def shouldCompress(blockId: String): Boolean = {
if (blockId.startsWith("shuffle_")) {
compressShuffle
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
index 03cd141805..488679f049 100644
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -1,6 +1,7 @@
package spark.storage
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
+import java.util.concurrent.ConcurrentHashMap
private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
@@ -18,6 +19,9 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
port = in.readInt()
}
+ @throws(classOf[IOException])
+ private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
+
override def toString = "BlockManagerId(" + ip + ", " + port + ")"
override def hashCode = ip.hashCode * 41 + port
@@ -26,4 +30,19 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
case id: BlockManagerId => port == id.port && ip == id.ip
case _ => false
}
-} \ No newline at end of file
+}
+
+
+private[spark] object BlockManagerId {
+
+ val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
+
+ def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
+ if (blockManagerIdCache.containsKey(id)) {
+ blockManagerIdCache.get(id)
+ } else {
+ blockManagerIdCache.put(id, id)
+ id
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 64cdb86f8d..cf11393a03 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,406 +1,17 @@
package spark.storage
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.ArrayBuffer
import scala.util.Random
-import akka.actor._
-import akka.dispatch._
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.dispatch.Await
import akka.pattern.ask
-import akka.remote._
import akka.util.{Duration, Timeout}
import akka.util.duration._
import spark.{Logging, SparkException, Utils}
-private[spark]
-case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
-
-
-// TODO(rxin): Move BlockManagerMasterActor to its own file.
-private[spark]
-class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
-
- class BlockManagerInfo(
- val blockManagerId: BlockManagerId,
- timeMs: Long,
- val maxMem: Long,
- val slaveActor: ActorRef) {
-
- private var _lastSeenMs: Long = timeMs
- private var _remainingMem: Long = maxMem
-
- // Mapping from block id to its status.
- private val _blocks = new JHashMap[String, BlockStatus]
-
- logInfo("Registering block manager %s:%d with %s RAM".format(
- blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
-
- def updateLastSeenMs() {
- _lastSeenMs = System.currentTimeMillis()
- }
-
- def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
- : Unit = synchronized {
-
- updateLastSeenMs()
-
- if (_blocks.containsKey(blockId)) {
- // The block exists on the slave already.
- val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
-
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- }
- }
-
- if (storageLevel.isValid) {
- // isValid means it is either stored in-memory or on-disk.
- _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
- if (storageLevel.useMemory) {
- _remainingMem -= memSize
- logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (storageLevel.useDisk) {
- logInfo("Added %s on disk on %s:%d (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- } else if (_blocks.containsKey(blockId)) {
- // If isValid is not true, drop the block.
- val blockStatus: BlockStatus = _blocks.get(blockId)
- _blocks.remove(blockId)
- if (blockStatus.storageLevel.useMemory) {
- _remainingMem += blockStatus.memSize
- logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (blockStatus.storageLevel.useDisk) {
- logInfo("Removed %s on %s:%d on disk (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- }
- }
-
- def remainingMem: Long = _remainingMem
-
- def lastSeenMs: Long = _lastSeenMs
-
- def blocks: JHashMap[String, BlockStatus] = _blocks
-
- override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
-
- def clear() {
- _blocks.clear()
- }
- }
-
- // Mapping from block manager id to the block manager's information.
- private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
-
- // Mapping from host name to block manager id.
- private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
-
- // Mapping from block id to the set of block managers that have the block.
- private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
-
- initLogging()
-
- val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
- "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
-
- val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
- "5000").toLong
-
- var timeoutCheckingTask: Cancellable = null
-
- override def preStart() {
- if (!BlockManager.getDisableHeartBeatsForTesting) {
- timeoutCheckingTask = context.system.scheduler.schedule(
- 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
- }
- super.preStart()
- }
-
- def removeBlockManager(blockManagerId: BlockManagerId) {
- val info = blockManagerInfo(blockManagerId)
- blockManagerIdByHost.remove(blockManagerId.ip)
- blockManagerInfo.remove(blockManagerId)
- var iterator = info.blocks.keySet.iterator
- while (iterator.hasNext) {
- val blockId = iterator.next
- val locations = blockInfo.get(blockId)._2
- locations -= blockManagerId
- if (locations.size == 0) {
- blockInfo.remove(locations)
- }
- }
- }
-
- def expireDeadHosts() {
- logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
- val now = System.currentTimeMillis()
- val minSeenTime = now - slaveTimeout
- val toRemove = new HashSet[BlockManagerId]
- for (info <- blockManagerInfo.values) {
- if (info.lastSeenMs < minSeenTime) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
- toRemove += info.blockManagerId
- }
- }
- // TODO: Remove corresponding block infos
- toRemove.foreach(removeBlockManager)
- }
-
- def removeHost(host: String) {
- logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
- logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- blockManagerIdByHost.get(host).foreach(removeBlockManager)
- logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
- sender ! true
- }
-
- def heartBeat(blockManagerId: BlockManagerId) {
- if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- sender ! true
- } else {
- sender ! false
- }
- } else {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- sender ! true
- }
- }
-
- def receive = {
- case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
- register(blockManagerId, maxMemSize, slaveActor)
-
- case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
-
- case GetLocations(blockId) =>
- getLocations(blockId)
-
- case GetLocationsMultipleBlockIds(blockIds) =>
- getLocationsMultipleBlockIds(blockIds)
-
- case GetPeers(blockManagerId, size) =>
- getPeersDeterministic(blockManagerId, size)
- /*getPeers(blockManagerId, size)*/
-
- case GetMemoryStatus =>
- getMemoryStatus
-
- case RemoveBlock(blockId) =>
- removeBlock(blockId)
-
- case RemoveHost(host) =>
- removeHost(host)
- sender ! true
-
- case StopBlockManagerMaster =>
- logInfo("Stopping BlockManagerMaster")
- sender ! true
- if (timeoutCheckingTask != null) {
- timeoutCheckingTask.cancel
- }
- context.stop(self)
-
- case ExpireDeadHosts =>
- expireDeadHosts()
-
- case HeartBeat(blockManagerId) =>
- heartBeat(blockManagerId)
-
- case other =>
- logInfo("Got unknown message: " + other)
- }
-
- // Remove a block from the slaves that have it. This can only be used to remove
- // blocks that the master knows about.
- private def removeBlock(blockId: String) {
- val block = blockInfo.get(blockId)
- if (block != null) {
- block._2.foreach { blockManagerId: BlockManagerId =>
- val blockManager = blockManagerInfo.get(blockManagerId)
- if (blockManager.isDefined) {
- // Remove the block from the slave's BlockManager.
- // Doesn't actually wait for a confirmation and the message might get lost.
- // If message loss becomes frequent, we should add retry logic here.
- blockManager.get.slaveActor ! RemoveBlock(blockId)
- // Remove the block from the master's BlockManagerInfo.
- blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0)
- }
- }
- blockInfo.remove(blockId)
- }
- sender ! true
- }
-
- // Return a map from the block manager id to max memory and remaining memory.
- private def getMemoryStatus() {
- val res = blockManagerInfo.map { case(blockManagerId, info) =>
- (blockManagerId, (info.maxMem, info.remainingMem))
- }.toMap
- sender ! res
- }
-
- private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " "
- logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- if (blockManagerIdByHost.contains(blockManagerId.ip) &&
- blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
- val oldId = blockManagerIdByHost(blockManagerId.ip)
- logInfo("Got second registration for host " + blockManagerId +
- "; removing old slave " + oldId)
- removeBlockManager(oldId)
- }
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- logInfo("Got Register Msg from master node, don't register it")
- } else {
- blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
- blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
- }
- blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
- logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
- sender ! true
- }
-
- private def blockUpdate(
- blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long) {
-
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockManagerId + " " + blockId + " "
-
- if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
- // We intentionally do not register the master (except in local mode),
- // so we should not indicate failure.
- sender ! true
- } else {
- sender ! false
- }
- return
- }
-
- if (blockId == null) {
- blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
- sender ! true
- return
- }
-
- blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
-
- var locations: HashSet[BlockManagerId] = null
- if (blockInfo.containsKey(blockId)) {
- locations = blockInfo.get(blockId)._2
- } else {
- locations = new HashSet[BlockManagerId]
- blockInfo.put(blockId, (storageLevel.replication, locations))
- }
-
- if (storageLevel.isValid) {
- locations += blockManagerId
- } else {
- locations.remove(blockManagerId)
- }
-
- if (locations.size == 0) {
- blockInfo.remove(blockId)
- }
- sender ! true
- }
-
- private def getLocations(blockId: String) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockId + " "
- logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- if (blockInfo.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
- + Utils.getUsedTimeMs(startTimeMs))
- sender ! res.toSeq
- } else {
- logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- sender ! res
- }
- }
-
- private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
- def getLocations(blockId: String): Seq[BlockManagerId] = {
- val tmp = blockId
- logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
- if (blockInfo.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
- logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
- return res.toSeq
- } else {
- logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- return res.toSeq
- }
- }
-
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
- var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
- for (blockId <- blockIds) {
- res.append(getLocations(blockId))
- }
- logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
- sender ! res.toSeq
- }
-
- private def getPeers(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(peers)
- res -= blockManagerId
- val rand = new Random(System.currentTimeMillis())
- while (res.length > size) {
- res.remove(rand.nextInt(res.length))
- }
- sender ! res.toSeq
- }
-
- private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-
- val peersWithIndices = peers.zipWithIndex
- val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
- if (selfIndex == -1) {
- throw new Exception("Self index for " + blockManagerId + " not found")
- }
-
- var index = selfIndex
- while (res.size < size) {
- index += 1
- if (index == selfIndex) {
- throw new Exception("More peer expected than available")
- }
- res += peers(index % peers.size)
- }
- sender ! res.toSeq
- }
-}
-
-
private[spark] class BlockManagerMaster(
val actorSystem: ActorSystem,
isMaster: Boolean,
@@ -409,245 +20,164 @@ private[spark] class BlockManagerMaster(
masterPort: Int)
extends Logging {
+ val AKKA_RETRY_ATTEMPS: Int = System.getProperty("spark.akka.num.retries", "5").toInt
+ val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "100").toInt
+
val MASTER_AKKA_ACTOR_NAME = "BlockMasterManager"
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
- val REQUEST_RETRY_INTERVAL_MS = 100
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
- var masterActor: ActorRef = null
-
- if (isMaster) {
- masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
- name = MASTER_AKKA_ACTOR_NAME)
- logInfo("Registered BlockManagerMaster Actor")
- } else {
- val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
- logInfo("Connecting to BlockManagerMaster: " + url)
- masterActor = actorSystem.actorFor(url)
- }
-
- def stop() {
- if (masterActor != null) {
- communicate(StopBlockManagerMaster)
- masterActor = null
- logInfo("BlockManagerMaster stopped")
- }
- }
-
- // Send a message to the master actor and get its result within a default timeout, or
- // throw a SparkException if this fails.
- def askMaster(message: Any): Any = {
- try {
- val future = masterActor.ask(message)(timeout)
- return Await.result(future, timeout)
- } catch {
- case e: Exception =>
- throw new SparkException("Error communicating with BlockManagerMaster", e)
+ var masterActor: ActorRef = {
+ if (isMaster) {
+ val masterActor = actorSystem.actorOf(Props(new BlockManagerMasterActor(isLocal)),
+ name = MASTER_AKKA_ACTOR_NAME)
+ logInfo("Registered BlockManagerMaster Actor")
+ masterActor
+ } else {
+ val url = "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, MASTER_AKKA_ACTOR_NAME)
+ logInfo("Connecting to BlockManagerMaster: " + url)
+ actorSystem.actorFor(url)
}
}
- // Send a one-way message to the master actor, to which we expect it to reply with true.
- def communicate(message: Any) {
- if (askMaster(message) != true) {
- throw new SparkException("Error reply received from BlockManagerMaster")
- }
- }
+ /** Remove a dead host from the master actor. This is only called on the master side. */
def notifyADeadHost(host: String) {
- communicate(RemoveHost(host))
+ tell(RemoveHost(host))
logInfo("Removed " + host + " successfully in notifyADeadHost")
}
- def mustRegisterBlockManager(
+ /**
+ * Send the master actor a heart beat from the slave. Returns true if everything works out,
+ * false if the master does not know about the given block manager, which means the block
+ * manager should re-register.
+ */
+ def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
+ askMasterWithRetry[Boolean](HeartBeat(blockManagerId))
+ }
+
+ /** Register the BlockManager's id with the master. */
+ def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
- val msg = RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)
logInfo("Trying to register BlockManager")
- while (! syncRegisterBlockManager(msg)) {
- logWarning("Failed to register " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- }
- logInfo("Done registering BlockManager")
+ tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
+ logInfo("Registered BlockManager")
}
- private def syncRegisterBlockManager(msg: RegisterBlockManager): Boolean = {
- //val masterActor = RemoteActor.select(node, name)
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncRegisterBlockManager 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- communicate(msg)
- logInfo("BlockManager registered successfully @ syncRegisterBlockManager")
- logDebug("Got in syncRegisterBlockManager 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return true
- } catch {
- case e: Exception =>
- logError("Failed in syncRegisterBlockManager", e)
- return false
- }
+ def updateBlockInfo(
+ blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long): Boolean = {
+ val res = askMasterWithRetry[Boolean](
+ UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
+ logInfo("Updated info of block " + blockId)
+ res
}
- def mustHeartBeat(msg: HeartBeat): Boolean = {
- var res = syncHeartBeat(msg)
- while (!res.isDefined) {
- logWarning("Failed to send heart beat " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- }
- return res.get
+ /** Get locations of the blockId from the master */
+ def getLocations(blockId: String): Seq[BlockManagerId] = {
+ askMasterWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}
- private def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
- try {
- val answer = askMaster(msg).asInstanceOf[Boolean]
- return Some(answer)
- } catch {
- case e: Exception =>
- logError("Failed in syncHeartBeat", e)
- return None
- }
+ /** Get locations of multiple blockIds from the master */
+ def getLocations(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+ askMasterWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
- def mustBlockUpdate(msg: BlockUpdate): Boolean = {
- var res = syncBlockUpdate(msg)
- while (!res.isDefined) {
- logWarning("Failed to send block update " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
+ /** Get ids of other nodes in the cluster from the master */
+ def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
+ val result = askMasterWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
+ if (result.length != numPeers) {
+ throw new SparkException(
+ "Error getting peers, only got " + result.size + " instead of " + numPeers)
}
- return res.get
+ result
}
- private def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[Boolean]
- logDebug("Block update sent successfully")
- logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
- return Some(answer)
- } catch {
- case e: Exception =>
- logError("Failed in syncBlockUpdate", e)
- return None
- }
+ /**
+ * Remove a block from the slaves that have it. This can only be used to remove
+ * blocks that the master knows about.
+ */
+ def removeBlock(blockId: String) {
+ askMaster(RemoveBlock(blockId))
}
- def mustGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
- var res = syncGetLocations(msg)
- while (res == null) {
- logInfo("Failed to get locations " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetLocations(msg)
- }
- return res
+ /**
+ * Return the memory status for each block manager, in the form of a map from
+ * the block manager's id to two long values. The first value is the maximum
+ * amount of memory allocated for the block manager, while the second is the
+ * amount of remaining memory.
+ */
+ def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
+ askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
- private def syncGetLocations(msg: GetLocations): Seq[BlockManagerId] = {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetLocations 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[ArrayBuffer[BlockManagerId]]
- if (answer != null) {
- logDebug("GetLocations successful")
- logDebug("Got in syncGetLocations 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetLocations")
- return null
- }
- } catch {
- case e: Exception =>
- logError("GetLocations failed", e)
- return null
+ /** Stop the master actor, called only on the Spark master node */
+ def stop() {
+ if (masterActor != null) {
+ tell(StopBlockManagerMaster)
+ masterActor = null
+ logInfo("BlockManagerMaster stopped")
}
}
- def mustGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
- Seq[Seq[BlockManagerId]] = {
- var res: Seq[Seq[BlockManagerId]] = syncGetLocationsMultipleBlockIds(msg)
- while (res == null) {
- logWarning("Failed to GetLocationsMultipleBlockIds " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetLocationsMultipleBlockIds(msg)
+ /** Send a one-way message to the master actor, to which we expect it to reply with true. */
+ private def tell(message: Any) {
+ if (!askMasterWithRetry[Boolean](message)) {
+ throw new SparkException("BlockManagerMasterActor returned false, expected true.")
}
- return res
}
- private def syncGetLocationsMultipleBlockIds(msg: GetLocationsMultipleBlockIds):
- Seq[Seq[BlockManagerId]] = {
- val startTimeMs = System.currentTimeMillis
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetLocationsMultipleBlockIds 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
+ /**
+ * Send a message to the master actor and get its result within a default timeout, or
+ * throw a SparkException if this fails. There is no retry logic here so if the Akka
+ * message is lost, the master actor won't get the command.
+ */
+ private def askMaster[T](message: Any): Any = {
try {
- val answer = askMaster(msg).asInstanceOf[Seq[Seq[BlockManagerId]]]
- if (answer != null) {
- logDebug("GetLocationsMultipleBlockIds successful")
- logDebug("Got in syncGetLocationsMultipleBlockIds 1 " + tmp +
- Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetLocationsMultipleBlockIds")
- return null
- }
+ val future = masterActor.ask(message)(timeout)
+ return Await.result(future, timeout).asInstanceOf[T]
} catch {
case e: Exception =>
- logError("GetLocationsMultipleBlockIds failed", e)
- return null
+ throw new SparkException("Error communicating with BlockManagerMaster", e)
}
}
- def mustGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
- var res = syncGetPeers(msg)
- while ((res == null) || (res.length != msg.size)) {
- logInfo("Failed to get peers " + msg)
- Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
- res = syncGetPeers(msg)
- }
- res
- }
-
- private def syncGetPeers(msg: GetPeers): Seq[BlockManagerId] = {
- val startTimeMs = System.currentTimeMillis
- val tmp = " msg " + msg + " "
- logDebug("Got in syncGetPeers 0 " + tmp + Utils.getUsedTimeMs(startTimeMs))
-
- try {
- val answer = askMaster(msg).asInstanceOf[Seq[BlockManagerId]]
- if (answer != null) {
- logDebug("GetPeers successful")
- logDebug("Got in syncGetPeers 1 " + tmp + Utils.getUsedTimeMs(startTimeMs))
- return answer
- } else {
- logError("Master replied null in response to GetPeers")
- return null
+ /**
+ * Send a message to the master actor and get its result within a default timeout, or
+ * throw a SparkException if this fails.
+ */
+ private def askMasterWithRetry[T](message: Any): T = {
+ // TODO: Consider removing multiple attempts
+ if (masterActor == null) {
+ throw new SparkException("Error sending message to BlockManager as masterActor is null " +
+ "[message = " + message + "]")
+ }
+ var attempts = 0
+ var lastException: Exception = null
+ while (attempts < AKKA_RETRY_ATTEMPS) {
+ attempts += 1
+ try {
+ val future = masterActor.ask(message)(timeout)
+ val result = Await.result(future, timeout)
+ if (result == null) {
+ throw new Exception("BlockManagerMaster returned null")
+ }
+ return result.asInstanceOf[T]
+ } catch {
+ case ie: InterruptedException => throw ie
+ case e: Exception =>
+ lastException = e
+ logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
}
- } catch {
- case e: Exception =>
- logError("GetPeers failed", e)
- return null
+ Thread.sleep(AKKA_RETRY_INTERVAL_MS)
}
- }
- /**
- * Remove a block from the slaves that have it. This can only be used to remove
- * blocks that the master knows about.
- */
- def removeBlock(blockId: String) {
- askMaster(RemoveBlock(blockId))
+ throw new SparkException(
+ "Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
}
- /**
- * Return the memory status for each block manager, in the form of a map from
- * the block manager's id to two long values. The first value is the maximum
- * amount of memory allocated for the block manager, while the second is the
- * amount of remaining memory.
- */
- def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
- askMaster(GetMemoryStatus).asInstanceOf[Map[BlockManagerId, (Long, Long)]]
- }
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
new file mode 100644
index 0000000000..0d84e559cb
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -0,0 +1,406 @@
+package spark.storage
+
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import scala.util.Random
+
+import akka.actor.{Actor, ActorRef, Cancellable}
+import akka.util.{Duration, Timeout}
+import akka.util.duration._
+
+import spark.{Logging, Utils}
+
+/**
+ * BlockManagerMasterActor is an actor on the master node to track statuses of
+ * all slaves' block managers.
+ */
+
+private[spark]
+object BlockManagerMasterActor {
+
+ case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+
+ class BlockManagerInfo(
+ val blockManagerId: BlockManagerId,
+ timeMs: Long,
+ val maxMem: Long,
+ val slaveActor: ActorRef)
+ extends Logging {
+
+ private var _lastSeenMs: Long = timeMs
+ private var _remainingMem: Long = maxMem
+
+ // Mapping from block id to its status.
+ private val _blocks = new JHashMap[String, BlockStatus]
+
+ logInfo("Registering block manager %s:%d with %s RAM".format(
+ blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
+
+ def updateLastSeenMs() {
+ _lastSeenMs = System.currentTimeMillis()
+ }
+
+ def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+ : Unit = synchronized {
+
+ updateLastSeenMs()
+
+ if (_blocks.containsKey(blockId)) {
+ // The block exists on the slave already.
+ val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+
+ if (originalLevel.useMemory) {
+ _remainingMem += memSize
+ }
+ }
+
+ if (storageLevel.isValid) {
+ // isValid means it is either stored in-memory or on-disk.
+ _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
+ if (storageLevel.useMemory) {
+ _remainingMem -= memSize
+ logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (storageLevel.useDisk) {
+ logInfo("Added %s on disk on %s:%d (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ } else if (_blocks.containsKey(blockId)) {
+ // If isValid is not true, drop the block.
+ val blockStatus: BlockStatus = _blocks.get(blockId)
+ _blocks.remove(blockId)
+ if (blockStatus.storageLevel.useMemory) {
+ _remainingMem += blockStatus.memSize
+ logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (blockStatus.storageLevel.useDisk) {
+ logInfo("Removed %s on %s:%d on disk (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ }
+ }
+
+ def remainingMem: Long = _remainingMem
+
+ def lastSeenMs: Long = _lastSeenMs
+
+ def blocks: JHashMap[String, BlockStatus] = _blocks
+
+ override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+
+ def clear() {
+ _blocks.clear()
+ }
+ }
+}
+
+
+private[spark]
+class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
+
+ // Mapping from block manager id to the block manager's information.
+ private val blockManagerInfo =
+ new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+
+ // Mapping from host name to block manager id.
+ private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
+
+ // Mapping from block id to the set of block managers that have the block.
+ private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+
+ initLogging()
+
+ val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+ "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+ val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ "5000").toLong
+
+ var timeoutCheckingTask: Cancellable = null
+
+ override def preStart() {
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ timeoutCheckingTask = context.system.scheduler.schedule(
+ 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ }
+ super.preStart()
+ }
+
+ def removeBlockManager(blockManagerId: BlockManagerId) {
+ val info = blockManagerInfo(blockManagerId)
+ blockManagerIdByHost.remove(blockManagerId.ip)
+ blockManagerInfo.remove(blockManagerId)
+ var iterator = info.blocks.keySet.iterator
+ while (iterator.hasNext) {
+ val blockId = iterator.next
+ val locations = blockInfo.get(blockId)._2
+ locations -= blockManagerId
+ if (locations.size == 0) {
+ blockInfo.remove(locations)
+ }
+ }
+ }
+
+ def expireDeadHosts() {
+ logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+ val now = System.currentTimeMillis()
+ val minSeenTime = now - slaveTimeout
+ val toRemove = new HashSet[BlockManagerId]
+ for (info <- blockManagerInfo.values) {
+ if (info.lastSeenMs < minSeenTime) {
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+ toRemove += info.blockManagerId
+ }
+ }
+ // TODO: Remove corresponding block infos
+ toRemove.foreach(removeBlockManager)
+ }
+
+ def removeHost(host: String) {
+ logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
+ logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
+ blockManagerIdByHost.get(host).foreach(removeBlockManager)
+ logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
+ sender ! true
+ }
+
+ def heartBeat(blockManagerId: BlockManagerId) {
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ sender ! true
+ } else {
+ sender ! false
+ }
+ } else {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ }
+ }
+
+ def receive = {
+ case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+ register(blockManagerId, maxMemSize, slaveActor)
+
+ case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
+
+ case GetLocations(blockId) =>
+ getLocations(blockId)
+
+ case GetLocationsMultipleBlockIds(blockIds) =>
+ getLocationsMultipleBlockIds(blockIds)
+
+ case GetPeers(blockManagerId, size) =>
+ getPeersDeterministic(blockManagerId, size)
+ /*getPeers(blockManagerId, size)*/
+
+ case GetMemoryStatus =>
+ getMemoryStatus
+
+ case RemoveBlock(blockId) =>
+ removeBlock(blockId)
+
+ case RemoveHost(host) =>
+ removeHost(host)
+ sender ! true
+
+ case StopBlockManagerMaster =>
+ logInfo("Stopping BlockManagerMaster")
+ sender ! true
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel
+ }
+ context.stop(self)
+
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+
+ case HeartBeat(blockManagerId) =>
+ heartBeat(blockManagerId)
+
+ case other =>
+ logInfo("Got unknown message: " + other)
+ }
+
+ // Remove a block from the slaves that have it. This can only be used to remove
+ // blocks that the master knows about.
+ private def removeBlock(blockId: String) {
+ val block = blockInfo.get(blockId)
+ if (block != null) {
+ block._2.foreach { blockManagerId: BlockManagerId =>
+ val blockManager = blockManagerInfo.get(blockManagerId)
+ if (blockManager.isDefined) {
+ // Remove the block from the slave's BlockManager.
+ // Doesn't actually wait for a confirmation and the message might get lost.
+ // If message loss becomes frequent, we should add retry logic here.
+ blockManager.get.slaveActor ! RemoveBlock(blockId)
+ // Remove the block from the master's BlockManagerInfo.
+ blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0)
+ }
+ }
+ blockInfo.remove(blockId)
+ }
+ sender ! true
+ }
+
+ // Return a map from the block manager id to max memory and remaining memory.
+ private def getMemoryStatus() {
+ val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ (blockManagerId, (info.maxMem, info.remainingMem))
+ }.toMap
+ sender ! res
+ }
+
+ private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockManagerId + " "
+ logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
+ if (blockManagerIdByHost.contains(blockManagerId.ip) &&
+ blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
+ val oldId = blockManagerIdByHost(blockManagerId.ip)
+ logInfo("Got second registration for host " + blockManagerId +
+ "; removing old slave " + oldId)
+ removeBlockManager(oldId)
+ }
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ logInfo("Got Register Msg from master node, don't register it")
+ } else {
+ blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
+ blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
+ }
+ blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
+ logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
+ sender ! true
+ }
+
+ private def blockUpdate(
+ blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long) {
+
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockManagerId + " " + blockId + " "
+
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ // We intentionally do not register the master (except in local mode),
+ // so we should not indicate failure.
+ sender ! true
+ } else {
+ sender ! false
+ }
+ return
+ }
+
+ if (blockId == null) {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
+ sender ! true
+ return
+ }
+
+ blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
+
+ var locations: HashSet[BlockManagerId] = null
+ if (blockInfo.containsKey(blockId)) {
+ locations = blockInfo.get(blockId)._2
+ } else {
+ locations = new HashSet[BlockManagerId]
+ blockInfo.put(blockId, (storageLevel.replication, locations))
+ }
+
+ if (storageLevel.isValid) {
+ locations += blockManagerId
+ } else {
+ locations.remove(blockManagerId)
+ }
+
+ if (locations.size == 0) {
+ blockInfo.remove(blockId)
+ }
+ sender ! true
+ }
+
+ private def getLocations(blockId: String) {
+ val startTimeMs = System.currentTimeMillis()
+ val tmp = " " + blockId + " "
+ logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
+ if (blockInfo.containsKey(blockId)) {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(blockInfo.get(blockId)._2)
+ logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ + Utils.getUsedTimeMs(startTimeMs))
+ sender ! res.toSeq
+ } else {
+ logDebug("Got in getLocations 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ sender ! res
+ }
+ }
+
+ private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
+ def getLocations(blockId: String): Seq[BlockManagerId] = {
+ val tmp = blockId
+ logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
+ if (blockInfo.containsKey(blockId)) {
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(blockInfo.get(blockId)._2)
+ logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
+ return res.toSeq
+ } else {
+ logDebug("Got in getLocationsMultipleBlockIds Sub 2 " + tmp)
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ return res.toSeq
+ }
+ }
+
+ logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq)
+ var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
+ for (blockId <- blockIds) {
+ res.append(getLocations(blockId))
+ }
+ logDebug("Got in getLocationsMultipleBlockIds " + blockIds.toSeq + " : " + res.toSeq)
+ sender ! res.toSeq
+ }
+
+ private def getPeers(blockManagerId: BlockManagerId, size: Int) {
+ var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ res.appendAll(peers)
+ res -= blockManagerId
+ val rand = new Random(System.currentTimeMillis())
+ while (res.length > size) {
+ res.remove(rand.nextInt(res.length))
+ }
+ sender ! res.toSeq
+ }
+
+ private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
+ var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
+ var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+
+ val peersWithIndices = peers.zipWithIndex
+ val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
+ if (selfIndex == -1) {
+ throw new Exception("Self index for " + blockManagerId + " not found")
+ }
+
+ var index = selfIndex
+ while (res.size < size) {
+ index += 1
+ if (index == selfIndex) {
+ throw new Exception("More peer expected than available")
+ }
+ res += peers(index % peers.size)
+ }
+ sender ! res.toSeq
+ }
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 5bca170f95..d73a9b790f 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -34,7 +34,7 @@ private[spark]
case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
private[spark]
-class BlockUpdate(
+class UpdateBlockInfo(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@@ -65,17 +65,17 @@ class BlockUpdate(
}
private[spark]
-object BlockUpdate {
+object UpdateBlockInfo {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): BlockUpdate = {
- new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ diskSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
- def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index c497f03e0c..e3544e5aae 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -1,6 +1,6 @@
package spark.storage
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
/**
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
@@ -10,14 +10,16 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
* commonly useful storage levels.
*/
class StorageLevel(
- var useDisk: Boolean,
+ var useDisk: Boolean,
var useMemory: Boolean,
var deserialized: Boolean,
var replication: Int = 1)
extends Externalizable {
// TODO: Also add fields for caching priority, dataset ID, and flushing.
-
+
+ assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
+
def this(flags: Int, replication: Int) {
this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication)
}
@@ -29,14 +31,14 @@ class StorageLevel(
override def equals(other: Any): Boolean = other match {
case s: StorageLevel =>
- s.useDisk == useDisk &&
+ s.useDisk == useDisk &&
s.useMemory == useMemory &&
s.deserialized == deserialized &&
- s.replication == replication
+ s.replication == replication
case _ =>
false
}
-
+
def isValid = ((useMemory || useDisk) && (replication > 0))
def toInt: Int = {
@@ -66,10 +68,16 @@ class StorageLevel(
replication = in.readByte()
}
+ @throws(classOf[IOException])
+ private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
+
override def toString: String =
"StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication)
+
+ override def hashCode(): Int = toInt * 41 + replication
}
+
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
@@ -82,4 +90,16 @@ object StorageLevel {
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
+
+ private[spark]
+ val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]()
+
+ private[spark] def getCachedStorageLevel(level: StorageLevel): StorageLevel = {
+ if (storageLevelCache.containsKey(level)) {
+ storageLevelCache.get(level)
+ } else {
+ storageLevelCache.put(level, level)
+ level
+ }
+ }
}
diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
new file mode 100644
index 0000000000..19e67acd0c
--- /dev/null
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -0,0 +1,35 @@
+package spark.util
+
+import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
+import java.util.{TimerTask, Timer}
+import spark.Logging
+
+class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+
+ val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt
+ val periodSeconds = math.max(10, delaySeconds / 10)
+ val timer = new Timer(name + " cleanup timer", true)
+
+ val task = new TimerTask {
+ def run() {
+ try {
+ if (delaySeconds > 0) {
+ cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
+ logInfo("Ran metadata cleaner for " + name)
+ }
+ } catch {
+ case e: Exception => logError("Error running cleanup task for " + name, e)
+ }
+ }
+ }
+ if (periodSeconds > 0) {
+ logInfo(
+ "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ + "period of " + periodSeconds + " secs")
+ timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
+ }
+
+ def cancel() {
+ timer.cancel()
+ }
+}
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
new file mode 100644
index 0000000000..070ee19ac0
--- /dev/null
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -0,0 +1,87 @@
+package spark.util
+
+import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, Map}
+
+/**
+ * This is a custom implementation of scala.collection.mutable.Map which stores the insertion
+ * time stamp along with each key-value pair. Key-value pairs that are older than a particular
+ * threshold time can them be removed using the cleanup method. This is intended to be a drop-in
+ * replacement of scala.collection.mutable.HashMap.
+ */
+class TimeStampedHashMap[A, B] extends Map[A, B]() {
+ val internalMap = new ConcurrentHashMap[A, (B, Long)]()
+
+ def get(key: A): Option[B] = {
+ val value = internalMap.get(key)
+ if (value != null) Some(value._1) else None
+ }
+
+ def iterator: Iterator[(A, B)] = {
+ val jIterator = internalMap.entrySet().iterator()
+ jIterator.map(kv => (kv.getKey, kv.getValue._1))
+ }
+
+ override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
+ val newMap = new TimeStampedHashMap[A, B1]
+ newMap.internalMap.putAll(this.internalMap)
+ newMap.internalMap.put(kv._1, (kv._2, currentTime))
+ newMap
+ }
+
+ override def - (key: A): Map[A, B] = {
+ internalMap.remove(key)
+ this
+ }
+
+ override def += (kv: (A, B)): this.type = {
+ internalMap.put(kv._1, (kv._2, currentTime))
+ this
+ }
+
+ override def -= (key: A): this.type = {
+ internalMap.remove(key)
+ this
+ }
+
+ override def update(key: A, value: B) {
+ this += ((key, value))
+ }
+
+ override def apply(key: A): B = {
+ val value = internalMap.get(key)
+ if (value == null) throw new NoSuchElementException()
+ value._1
+ }
+
+ override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
+ internalMap.map(kv => (kv._1, kv._2._1)).filter(p)
+ }
+
+ override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
+
+ override def size(): Int = internalMap.size()
+
+ override def foreach[U](f: ((A, B)) => U): Unit = {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val kv = (entry.getKey, entry.getValue._1)
+ f(kv)
+ }
+ }
+
+ def cleanup(threshTime: Long) {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ if (entry.getValue._2 < threshTime) {
+ iterator.remove()
+ }
+ }
+ }
+
+ private def currentTime: Long = System.currentTimeMillis()
+
+}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 4dc3b7ec05..e50ce1430f 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -22,6 +22,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
val serializer = new KryoSerializer
before {
@@ -63,7 +64,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
}
- test("manager-master interaction") {
+ test("StorageLevel object caching") {
+ val level1 = new StorageLevel(false, false, false, 3)
+ val level2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(level1)
+ val level1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(level2)
+ val level2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(level1_ === level1, "Deserialized level1 not same as original level1")
+ assert(level2_ === level2, "Deserialized level2 not same as original level1")
+ assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2")
+ assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1")
+ }
+
+ test("BlockManagerId object caching") {
+ val id1 = new StorageLevel(false, false, false, 3)
+ val id2 = new StorageLevel(false, false, false, 3)
+ val bytes1 = spark.Utils.serialize(id1)
+ val id1_ = spark.Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = spark.Utils.serialize(id2)
+ val id2_ = spark.Utils.deserialize[StorageLevel](bytes2)
+ assert(id1_ === id1, "Deserialized id1 not same as original id1")
+ assert(id2_ === id2, "Deserialized id2 not same as original id1")
+ assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2")
+ assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1")
+ }
+
+ test("master + 1 manager interaction") {
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
@@ -80,17 +107,33 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
- assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
- assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3").size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
- assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
- assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+ assert(master.getLocations("a1").size === 0, "master did not remove a1")
+ assert(master.getLocations("a2").size === 0, "master did not remove a2")
+ }
+
+ test("master + 2 managers interaction") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val otherStore = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+
+ val peers = master.getPeers(store.blockManagerId, 1)
+ assert(peers.size === 1, "master did not return the other manager as a peer")
+ assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager")
+
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+ assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
+ assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
}
test("removing block") {
@@ -113,9 +156,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
- assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
- assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3").size === 0, "master was told about a3")
// Remove a1 and a2 and a3. Should be no-op for a3.
master.removeBlock("a1")
@@ -123,10 +166,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeBlock("a3")
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
- assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
- assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
+ assert(master.getLocations("a1").size === 0, "master did not remove a1")
+ assert(master.getLocations("a2").size === 0, "master did not remove a2")
assert(store.getSingle("a3") != None, "a3 was not in store")
- assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
+ assert(master.getLocations("a3").size === 0, "master was told about a3")
memStatus = master.getMemoryStatus.head._2
assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000")
@@ -140,13 +183,13 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.notifyADeadHost(store.blockManagerId.ip)
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
- assert(master.mustGetLocations(GetLocations("a1")).size > 0,
+ assert(master.getLocations("a1").size > 0,
"a1 was not reregistered with master")
}
@@ -157,17 +200,15 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.notifyADeadHost(store.blockManagerId.ip)
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
- assert(master.mustGetLocations(GetLocations("a1")).size > 0,
- "a1 was not reregistered with master")
- assert(master.mustGetLocations(GetLocations("a2")).size > 0,
- "master was not told about a2")
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
}
test("deregistration on duplicate") {
@@ -177,19 +218,19 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
store2 = new BlockManager(actorSystem, master, serializer, 2000)
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
- assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
store2 invokePrivate heartBeat()
- assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
+ assert(master.getLocations("a1").size == 0, "a2 was not removed from master")
}
test("in-memory LRU storage") {