diff options
author | Charles Reiss <charles@eecs.berkeley.edu> | 2012-12-05 23:11:06 -0800 |
---|---|---|
committer | Charles Reiss <charles@eecs.berkeley.edu> | 2012-12-05 23:35:20 -0800 |
commit | c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6 (patch) | |
tree | fdba8de3fb840658cb9ebcf67fa7197e99189dfe /core | |
parent | 5afa2ee9e9138d834b5ccdba3722ef3a7d7a48aa (diff) | |
download | spark-c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6.tar.gz spark-c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6.tar.bz2 spark-c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6.zip |
Track block managers by hostname; handle manager removal.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManagerMaster.scala | 30 |
1 files changed, 27 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 531331b0e5..4959c05f94 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -156,6 +156,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor def lastSeenMs: Long = _lastSeenMs + def blocks: JHashMap[String, StorageLevel] = _blocks + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem def clear() { @@ -164,16 +166,30 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo] + private val blockManagerIdByHost = new HashMap[String, BlockManagerId] private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] initLogging() + def removeBlockManager(blockManagerId: BlockManagerId) { + val info = blockManagerInfo(blockManagerId) + blockManagerIdByHost.remove(blockManagerId.ip) + blockManagerInfo.remove(blockManagerId) + var iterator = info.blocks.keySet.iterator + while (iterator.hasNext) { + val blockId = iterator.next + val locations = blockInfo.get(blockId)._2 + locations -= blockManagerId + if (locations.size == 0) { + blockInfo.remove(locations) + } + } + } + def removeHost(host: String) { logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) - val ip = host.split(":")(0) - val port = host.split(":")(1) - blockManagerInfo.remove(new BlockManagerId(ip, port.toInt)) + blockManagerIdByHost.get(host).map(removeBlockManager) logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) sender ! true } @@ -223,12 +239,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) + if (blockManagerIdByHost.contains(blockManagerId.ip) && + blockManagerIdByHost(blockManagerId.ip) != blockManagerId) { + val oldId = blockManagerIdByHost(blockManagerId.ip) + logInfo("Got second registration for host " + blockManagerId + + "; removing old slave " + oldId) + removeBlockManager(oldId) + } if (blockManagerId.ip == Utils.localHostName() && !isLocal) { logInfo("Got Register Msg from master node, don't register it") } else { blockManagerInfo += (blockManagerId -> new BlockManagerInfo( blockManagerId, System.currentTimeMillis() / 1000, maxMemSize)) } + blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true } |