aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2012-12-05 23:11:06 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2012-12-05 23:35:20 -0800
commitc9e54a6755961a5cc9eda45df6a2e5e2df1b01a6 (patch)
treefdba8de3fb840658cb9ebcf67fa7197e99189dfe /core
parent5afa2ee9e9138d834b5ccdba3722ef3a7d7a48aa (diff)
downloadspark-c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6.tar.gz
spark-c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6.tar.bz2
spark-c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6.zip
Track block managers by hostname; handle manager removal.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala30
1 files changed, 27 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 531331b0e5..4959c05f94 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -156,6 +156,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
def lastSeenMs: Long = _lastSeenMs
+ def blocks: JHashMap[String, StorageLevel] = _blocks
+
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
@@ -164,16 +166,30 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
+ private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
+ def removeBlockManager(blockManagerId: BlockManagerId) {
+ val info = blockManagerInfo(blockManagerId)
+ blockManagerIdByHost.remove(blockManagerId.ip)
+ blockManagerInfo.remove(blockManagerId)
+ var iterator = info.blocks.keySet.iterator
+ while (iterator.hasNext) {
+ val blockId = iterator.next
+ val locations = blockInfo.get(blockId)._2
+ locations -= blockManagerId
+ if (locations.size == 0) {
+ blockInfo.remove(locations)
+ }
+ }
+ }
+
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- val ip = host.split(":")(0)
- val port = host.split(":")(1)
- blockManagerInfo.remove(new BlockManagerId(ip, port.toInt))
+ blockManagerIdByHost.get(host).map(removeBlockManager)
logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
sender ! true
}
@@ -223,12 +239,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
+ if (blockManagerIdByHost.contains(blockManagerId.ip) &&
+ blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
+ val oldId = blockManagerIdByHost(blockManagerId.ip)
+ logInfo("Got second registration for host " + blockManagerId +
+ "; removing old slave " + oldId)
+ removeBlockManager(oldId)
+ }
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
}
+ blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}