aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-14 00:26:36 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-14 00:26:36 -0800
commit8c01295b859c35f4034528d4487a45c34728d0fb (patch)
treea3547b4064d361b5cc19b3f961b421a8356f9e19 /core
parent0235667f731b3b4bc142d9a810d9021124e58a6a (diff)
downloadspark-8c01295b859c35f4034528d4487a45c34728d0fb.tar.gz
spark-8c01295b859c35f4034528d4487a45c34728d0fb.tar.bz2
spark-8c01295b859c35f4034528d4487a45c34728d0fb.zip
Fixed conflicts from merging Charles' and TD's block manager changes.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala299
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala31
3 files changed, 158 insertions, 173 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index cf11393a03..e8a1e5889f 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -41,7 +41,6 @@ private[spark] class BlockManagerMaster(
}
}
-
/** Remove a dead host from the master actor. This is only called on the master side. */
def notifyADeadHost(host: String) {
tell(RemoveHost(host))
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 0d84e559cb..e3de8d8e4e 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -16,91 +16,6 @@ import spark.{Logging, Utils}
* BlockManagerMasterActor is an actor on the master node to track statuses of
* all slaves' block managers.
*/
-
-private[spark]
-object BlockManagerMasterActor {
-
- case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
-
- class BlockManagerInfo(
- val blockManagerId: BlockManagerId,
- timeMs: Long,
- val maxMem: Long,
- val slaveActor: ActorRef)
- extends Logging {
-
- private var _lastSeenMs: Long = timeMs
- private var _remainingMem: Long = maxMem
-
- // Mapping from block id to its status.
- private val _blocks = new JHashMap[String, BlockStatus]
-
- logInfo("Registering block manager %s:%d with %s RAM".format(
- blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
-
- def updateLastSeenMs() {
- _lastSeenMs = System.currentTimeMillis()
- }
-
- def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
- : Unit = synchronized {
-
- updateLastSeenMs()
-
- if (_blocks.containsKey(blockId)) {
- // The block exists on the slave already.
- val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
-
- if (originalLevel.useMemory) {
- _remainingMem += memSize
- }
- }
-
- if (storageLevel.isValid) {
- // isValid means it is either stored in-memory or on-disk.
- _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
- if (storageLevel.useMemory) {
- _remainingMem -= memSize
- logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (storageLevel.useDisk) {
- logInfo("Added %s on disk on %s:%d (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- } else if (_blocks.containsKey(blockId)) {
- // If isValid is not true, drop the block.
- val blockStatus: BlockStatus = _blocks.get(blockId)
- _blocks.remove(blockId)
- if (blockStatus.storageLevel.useMemory) {
- _remainingMem += blockStatus.memSize
- logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
- }
- if (blockStatus.storageLevel.useDisk) {
- logInfo("Removed %s on %s:%d on disk (size: %s)".format(
- blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
- }
- }
- }
-
- def remainingMem: Long = _remainingMem
-
- def lastSeenMs: Long = _lastSeenMs
-
- def blocks: JHashMap[String, BlockStatus] = _blocks
-
- override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
-
- def clear() {
- _blocks.clear()
- }
- }
-}
-
-
private[spark]
class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
@@ -108,8 +23,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private val blockManagerInfo =
new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
- // Mapping from host name to block manager id.
- private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
+ // Mapping from host name to block manager id. We allow multiple block managers
+ // on the same host name (ip).
+ private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
// Mapping from block id to the set of block managers that have the block.
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
@@ -132,9 +48,62 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
super.preStart()
}
+ def receive = {
+ case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
+ register(blockManagerId, maxMemSize, slaveActor)
+
+ case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
+
+ case GetLocations(blockId) =>
+ getLocations(blockId)
+
+ case GetLocationsMultipleBlockIds(blockIds) =>
+ getLocationsMultipleBlockIds(blockIds)
+
+ case GetPeers(blockManagerId, size) =>
+ getPeersDeterministic(blockManagerId, size)
+ /*getPeers(blockManagerId, size)*/
+
+ case GetMemoryStatus =>
+ getMemoryStatus
+
+ case RemoveBlock(blockId) =>
+ removeBlock(blockId)
+
+ case RemoveHost(host) =>
+ removeHost(host)
+ sender ! true
+
+ case StopBlockManagerMaster =>
+ logInfo("Stopping BlockManagerMaster")
+ sender ! true
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel
+ }
+ context.stop(self)
+
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+
+ case HeartBeat(blockManagerId) =>
+ heartBeat(blockManagerId)
+
+ case other =>
+ logInfo("Got unknown message: " + other)
+ }
+
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
- blockManagerIdByHost.remove(blockManagerId.ip)
+
+ // Remove the block manager from blockManagerIdByHost. If the list of block
+ // managers belonging to the IP is empty, remove the entry from the hash map.
+ blockManagerIdByHost.get(blockManagerId.ip).foreach { managers: ArrayBuffer[BlockManagerId] =>
+ managers -= blockManagerId
+ if (managers.size == 0) blockManagerIdByHost.remove(blockManagerId.ip)
+ }
+
+ // Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
var iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
@@ -158,14 +127,13 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
toRemove += info.blockManagerId
}
}
- // TODO: Remove corresponding block infos
toRemove.foreach(removeBlockManager)
}
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- blockManagerIdByHost.get(host).foreach(removeBlockManager)
+ blockManagerIdByHost.get(host).foreach(_.foreach(removeBlockManager))
logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
sender ! true
}
@@ -183,51 +151,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
}
- def receive = {
- case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
- register(blockManagerId, maxMemSize, slaveActor)
-
- case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
-
- case GetLocations(blockId) =>
- getLocations(blockId)
-
- case GetLocationsMultipleBlockIds(blockIds) =>
- getLocationsMultipleBlockIds(blockIds)
-
- case GetPeers(blockManagerId, size) =>
- getPeersDeterministic(blockManagerId, size)
- /*getPeers(blockManagerId, size)*/
-
- case GetMemoryStatus =>
- getMemoryStatus
-
- case RemoveBlock(blockId) =>
- removeBlock(blockId)
-
- case RemoveHost(host) =>
- removeHost(host)
- sender ! true
-
- case StopBlockManagerMaster =>
- logInfo("Stopping BlockManagerMaster")
- sender ! true
- if (timeoutCheckingTask != null) {
- timeoutCheckingTask.cancel
- }
- context.stop(self)
-
- case ExpireDeadHosts =>
- expireDeadHosts()
-
- case HeartBeat(blockManagerId) =>
- heartBeat(blockManagerId)
-
- case other =>
- logInfo("Got unknown message: " + other)
- }
-
// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlock(blockId: String) {
@@ -261,20 +184,22 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
- if (blockManagerIdByHost.contains(blockManagerId.ip) &&
- blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
- val oldId = blockManagerIdByHost(blockManagerId.ip)
- logInfo("Got second registration for host " + blockManagerId +
- "; removing old slave " + oldId)
- removeBlockManager(oldId)
- }
+
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
+ blockManagerIdByHost.get(blockManagerId.ip) match {
+ case Some(managers) =>
+ // A block manager of the same host name already exists.
+ logInfo("Got another registration for host " + blockManagerId)
+ managers += blockManagerId
+ case None =>
+ blockManagerIdByHost += (blockManagerId.ip -> ArrayBuffer(blockManagerId))
+ }
+
blockManagerInfo += (blockManagerId -> new BlockManagerMasterActor.BlockManagerInfo(
blockManagerId, System.currentTimeMillis(), maxMemSize, slaveActor))
}
- blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
@@ -387,12 +312,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- val peersWithIndices = peers.zipWithIndex
- val selfIndex = peersWithIndices.find(_._1 == blockManagerId).map(_._2).getOrElse(-1)
+ val selfIndex = peers.indexOf(blockManagerId)
if (selfIndex == -1) {
throw new Exception("Self index for " + blockManagerId + " not found")
}
+ // Note that this logic will select the same node multiple times if there aren't enough peers
var index = selfIndex
while (res.size < size) {
index += 1
@@ -404,3 +329,87 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! res.toSeq
}
}
+
+
+private[spark]
+object BlockManagerMasterActor {
+
+ case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+
+ class BlockManagerInfo(
+ val blockManagerId: BlockManagerId,
+ timeMs: Long,
+ val maxMem: Long,
+ val slaveActor: ActorRef)
+ extends Logging {
+
+ private var _lastSeenMs: Long = timeMs
+ private var _remainingMem: Long = maxMem
+
+ // Mapping from block id to its status.
+ private val _blocks = new JHashMap[String, BlockStatus]
+
+ logInfo("Registering block manager %s:%d with %s RAM".format(
+ blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
+
+ def updateLastSeenMs() {
+ _lastSeenMs = System.currentTimeMillis()
+ }
+
+ def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
+ : Unit = synchronized {
+
+ updateLastSeenMs()
+
+ if (_blocks.containsKey(blockId)) {
+ // The block exists on the slave already.
+ val originalLevel: StorageLevel = _blocks.get(blockId).storageLevel
+
+ if (originalLevel.useMemory) {
+ _remainingMem += memSize
+ }
+ }
+
+ if (storageLevel.isValid) {
+ // isValid means it is either stored in-memory or on-disk.
+ _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize))
+ if (storageLevel.useMemory) {
+ _remainingMem -= memSize
+ logInfo("Added %s in memory on %s:%d (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (storageLevel.useDisk) {
+ logInfo("Added %s on disk on %s:%d (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ } else if (_blocks.containsKey(blockId)) {
+ // If isValid is not true, drop the block.
+ val blockStatus: BlockStatus = _blocks.get(blockId)
+ _blocks.remove(blockId)
+ if (blockStatus.storageLevel.useMemory) {
+ _remainingMem += blockStatus.memSize
+ logInfo("Removed %s on %s:%d in memory (size: %s, free: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(memSize),
+ Utils.memoryBytesToString(_remainingMem)))
+ }
+ if (blockStatus.storageLevel.useDisk) {
+ logInfo("Removed %s on %s:%d on disk (size: %s)".format(
+ blockId, blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(diskSize)))
+ }
+ }
+ }
+
+ def remainingMem: Long = _remainingMem
+
+ def lastSeenMs: Long = _lastSeenMs
+
+ def blocks: JHashMap[String, BlockStatus] = _blocks
+
+ override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
+
+ def clear() {
+ _blocks.clear()
+ }
+ }
+}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index e50ce1430f..4e28a7e2bc 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -122,16 +122,16 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("master + 2 managers interaction") {
store = new BlockManager(actorSystem, master, serializer, 2000)
- val otherStore = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
+ store2 = new BlockManager(actorSystem, master, new KryoSerializer, 2000)
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
- assert(peers.head === otherStore.blockManagerId, "peer returned by master is not the other manager")
+ assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
- otherStore.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+ store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
}
@@ -189,8 +189,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
- assert(master.getLocations("a1").size > 0,
- "a1 was not reregistered with master")
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
}
test("reregistration on block update") {
@@ -211,28 +210,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.getLocations("a2").size > 0, "master was not told about a2")
}
- test("deregistration on duplicate") {
- val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager(actorSystem, master, serializer, 2000)
- val a1 = new Array[Byte](400)
-
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
-
- assert(master.getLocations("a1").size > 0, "master was not told about a1")
-
- store2 = new BlockManager(actorSystem, master, serializer, 2000)
-
- assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
-
- store invokePrivate heartBeat()
-
- assert(master.getLocations("a1").size > 0, "master was not told about a1")
-
- store2 invokePrivate heartBeat()
-
- assert(master.getLocations("a1").size == 0, "a2 was not removed from master")
- }
-
test("in-memory LRU storage") {
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)