From 813ac7145954f5963362f7a9b35e4e123174bb9d Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 5 Dec 2012 22:56:52 -0800 Subject: Don't use bogus port number in notifyADeadHost(). --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index ace27e758c..0d88c63d89 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -361,7 +361,6 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost") val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt val DEFAULT_MANAGER_IP: String = Utils.localHostName() - val DEFAULT_MANAGER_PORT: String = "10902" val timeout = 10.seconds var masterActor: ActorRef = null @@ -405,7 +404,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool } def notifyADeadHost(host: String) { - communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT)) + communicate(RemoveHost(host)) logInfo("Removed " + host + " successfully in notifyADeadHost") } -- cgit v1.2.3 From 5afa2ee9e9138d834b5ccdba3722ef3a7d7a48aa Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 5 Dec 2012 22:59:55 -0800 Subject: Actually put millis in _lastSeenMs --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 0d88c63d89..531331b0e5 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -105,7 +105,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem))) def updateLastSeenMs() { - _lastSeenMs = System.currentTimeMillis() / 1000 + _lastSeenMs = System.currentTimeMillis() } def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) -- cgit v1.2.3 From c9e54a6755961a5cc9eda45df6a2e5e2df1b01a6 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 5 Dec 2012 23:11:06 -0800 Subject: Track block managers by hostname; handle manager removal. --- .../scala/spark/storage/BlockManagerMaster.scala | 30 +++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 531331b0e5..4959c05f94 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -156,6 +156,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor def lastSeenMs: Long = _lastSeenMs + def blocks: JHashMap[String, StorageLevel] = _blocks + override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem def clear() { @@ -164,16 +166,30 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo] + private val blockManagerIdByHost = new HashMap[String, BlockManagerId] private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]] initLogging() + def removeBlockManager(blockManagerId: BlockManagerId) { + val info = blockManagerInfo(blockManagerId) + blockManagerIdByHost.remove(blockManagerId.ip) + blockManagerInfo.remove(blockManagerId) + var iterator = info.blocks.keySet.iterator + while (iterator.hasNext) { + val blockId = iterator.next + val locations = blockInfo.get(blockId)._2 + locations -= blockManagerId + if (locations.size == 0) { + blockInfo.remove(locations) + } + } + } + def removeHost(host: String) { logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) - val ip = host.split(":")(0) - val port = host.split(":")(1) - blockManagerInfo.remove(new BlockManagerId(ip, port.toInt)) + blockManagerIdByHost.get(host).map(removeBlockManager) logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) sender ! true } @@ -223,12 +239,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) + if (blockManagerIdByHost.contains(blockManagerId.ip) && + blockManagerIdByHost(blockManagerId.ip) != blockManagerId) { + val oldId = blockManagerIdByHost(blockManagerId.ip) + logInfo("Got second registration for host " + blockManagerId + + "; removing old slave " + oldId) + removeBlockManager(oldId) + } if (blockManagerId.ip == Utils.localHostName() && !isLocal) { logInfo("Got Register Msg from master node, don't register it") } else { blockManagerInfo += (blockManagerId -> new BlockManagerInfo( blockManagerId, System.currentTimeMillis() / 1000, maxMemSize)) } + blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) sender ! true } -- cgit v1.2.3 From d21ca010ac14890065e559bab80f56830bb533a7 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 5 Dec 2012 23:12:33 -0800 Subject: Add block manager heart beats. Renames old message called 'HeartBeat' to 'BlockUpdate'. The BlockManager periodically sends a heart beat message to the master. If the manager is currently not registered. The master responds to the heart beat by indicating whether the BlockManager is currently registered with the master. Additionally, the master now also responds to block updates by indicating whether the BlockManager in question is registered. When the BlockManager detects (by heart beat or failed block update) that it stopped being registered, it reregisters and sends block updates for all its blocks. --- .../main/scala/spark/storage/BlockManager.scala | 88 +++++++++++++- .../scala/spark/storage/BlockManagerMaster.scala | 130 +++++++++++++++++---- 2 files changed, 193 insertions(+), 25 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index bf52b510b4..4753f7f956 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -104,8 +104,33 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Whether to compress RDD partitions that are stored serialized val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean + val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties + val host = System.getProperty("spark.hostname", Utils.localHostName()) + @volatile private var shuttingDown = false + + private def heartBeat() { + if (!master.mustHeartBeat(HeartBeat(blockManagerId))) { + reregister() + } + } + + val heartBeatThread = new Thread("BlockManager heartbeat") { + setDaemon(true) + + override def run: Unit = { + while (!shuttingDown) { + heartBeat() + try { + Thread.sleep(heartBeatFrequency) + } catch { + case e: InterruptedException => {} + } + } + } + } + initialize() /** @@ -123,6 +148,41 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m master.mustRegisterBlockManager( RegisterBlockManager(blockManagerId, maxMemory)) BlockManagerWorker.startBlockManagerWorker(this) + if (!BlockManager.getDisableHeartBeatsForTesting) { + heartBeatThread.start() + } + } + + /** + * Report all blocks to the BlockManager again. This may be necessary if we are dropped + * by the BlockManager and come back or if we become capable of recovering blocks on disk after + * an executor crash. + * + * This function deliberately fails silently if the master returns false (indicating that + * the slave needs to reregister). The error condition will be detected again by the next + * heart beat attempt or new block registration and another try to reregister all blocks + * will be made then. + */ + private def reportAllBlocks() { + logInfo("Reporting " + blockInfo.size + " blocks to the master.") + for (blockId <- blockInfo.keys) { + if (!tryToReportBlockStatus(blockId)) { + logError("Failed to report " + blockId + " to master; giving up.") + return + } + } + } + + /** + * Reregister with the master and report all blocks to it. This will be called by the heart beat + * thread if our heartbeat to the block amnager indicates that we were not registered. + */ + def reregister() { + // TODO: We might need to rate limit reregistering. + logInfo("BlockManager reregistering with master") + master.mustRegisterBlockManager( + RegisterBlockManager(blockManagerId, maxMemory)) + reportAllBlocks() } /** @@ -134,12 +194,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Tell the master about the current storage status of a block. This will send a heartbeat + * Tell the master about the current storage status of a block. This will send a block update * message reflecting the current status, *not* the desired storage level in its block info. * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. */ def reportBlockStatus(blockId: String) { + val needReregister = !tryToReportBlockStatus(blockId) + if (needReregister) { + logInfo("Got told to reregister updating block " + blockId) + // Reregistering will report our new block for free. + reregister() + } + logDebug("Told master about block " + blockId) + } + /** + * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo + * block was successfully recorded and false if the slave needs to reregister. + */ + private def tryToReportBlockStatus(blockId: String): Boolean = { val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match { case null => (StorageLevel.NONE, 0L, 0L) @@ -159,10 +232,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } } - master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) - logDebug("Told master about block " + blockId) + return master.mustBlockUpdate( + BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) } + /** * Get locations of the block. */ @@ -840,6 +914,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } def stop() { + shuttingDown = true + heartBeatThread.interrupt() connectionManager.stop() blockInfo.clear() memoryStore.clear() @@ -855,6 +931,12 @@ object BlockManager extends Logging { (Runtime.getRuntime.maxMemory * memoryFraction).toLong } + def getHeartBeatFrequencyFromSystemProperties: Long = + System.getProperty("spark.storage.blockManagerHeartBeatMs", "2000").toLong + + def getDisableHeartBeatsForTesting: Boolean = + System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean + /** * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that * might cause errors if one attempts to read from the unmapped buffer, but it's better than diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 4959c05f94..1a0b477d92 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -26,7 +26,10 @@ case class RegisterBlockManager( extends ToBlockManagerMaster private[spark] -class HeartBeat( +case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + +private[spark] +class BlockUpdate( var blockManagerId: BlockManagerId, var blockId: String, var storageLevel: StorageLevel, @@ -57,17 +60,17 @@ class HeartBeat( } private[spark] -object HeartBeat { +object BlockUpdate { def apply(blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, memSize: Long, - diskSize: Long): HeartBeat = { - new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize) + diskSize: Long): BlockUpdate = { + new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize) } // For pattern-matching - def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { + def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) } } @@ -90,6 +93,9 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster private[spark] case object GetMemoryStatus extends ToBlockManagerMaster +private[spark] +case object ExpireDeadHosts extends ToBlockManagerMaster + private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { @@ -171,6 +177,22 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor initLogging() + val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs", + "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong + + val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", + "5000").toLong + + var timeoutCheckingTask: Cancellable = null + + override def preStart() { + if (!BlockManager.getDisableHeartBeatsForTesting) { + timeoutCheckingTask = context.system.scheduler.schedule( + 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) + } + super.preStart() + } + def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) blockManagerIdByHost.remove(blockManagerId.ip) @@ -186,6 +208,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } } + def expireDeadHosts() { + logInfo("Checking for hosts with no recent heart beats in BlockManagerMaster.") + val now = System.currentTimeMillis() + val minSeenTime = now - slaveTimeout + val toRemove = new HashSet[BlockManagerId] + for (info <- blockManagerInfo.values) { + if (info.lastSeenMs < minSeenTime) { + toRemove += info.blockManagerId + } + } + // TODO: Remove corresponding block infos + toRemove.foreach(removeBlockManager) + } + def removeHost(host: String) { logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) @@ -194,12 +230,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor sender ! true } + def heartBeat(blockManagerId: BlockManagerId) { + if (!blockManagerInfo.contains(blockManagerId)) { + if (blockManagerId.ip == Utils.localHostName() && !isLocal) { + sender ! true + } else { + sender ! false + } + } else { + blockManagerInfo(blockManagerId).updateLastSeenMs() + sender ! true + } + } + def receive = { case RegisterBlockManager(blockManagerId, maxMemSize) => register(blockManagerId, maxMemSize) - case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) => - heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) + case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) => + blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) case GetLocations(blockId) => getLocations(blockId) @@ -221,8 +270,17 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor case StopBlockManagerMaster => logInfo("Stopping BlockManagerMaster") sender ! true + if (timeoutCheckingTask != null) { + timeoutCheckingTask.cancel + } context.stop(self) + case ExpireDeadHosts => + expireDeadHosts() + + case HeartBeat(blockManagerId) => + heartBeat(blockManagerId) + case other => logInfo("Got unknown message: " + other) } @@ -257,7 +315,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor sender ! true } - private def heartBeat( + private def blockUpdate( blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, @@ -268,15 +326,21 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val tmp = " " + blockManagerId + " " + blockId + " " if (!blockManagerInfo.contains(blockManagerId)) { - // Can happen if this is from a locally cached partition on the master - sender ! true + if (blockManagerId.ip == Utils.localHostName() && !isLocal) { + // We intentionally do not register the master (except in local mode), + // so we should not indicate failure. + sender ! true + } else { + sender ! false + } return } if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) + logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) sender ! true + return } blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize) @@ -459,27 +523,49 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool } } - def mustHeartBeat(msg: HeartBeat) { - while (! syncHeartBeat(msg)) { - logWarning("Failed to send heartbeat" + msg) + def mustHeartBeat(msg: HeartBeat): Boolean = { + var res = syncHeartBeat(msg) + while (!res.isDefined) { + logWarning("Failed to send heart beat " + msg) + Thread.sleep(REQUEST_RETRY_INTERVAL_MS) + } + return res.get + } + + def syncHeartBeat(msg: HeartBeat): Option[Boolean] = { + try { + val answer = askMaster(msg).asInstanceOf[Boolean] + return Some(answer) + } catch { + case e: Exception => + logError("Failed in syncHeartBeat", e) + return None + } + } + + def mustBlockUpdate(msg: BlockUpdate): Boolean = { + var res = syncBlockUpdate(msg) + while (!res.isDefined) { + logWarning("Failed to send block update " + msg) Thread.sleep(REQUEST_RETRY_INTERVAL_MS) } + return res.get } - def syncHeartBeat(msg: HeartBeat): Boolean = { + def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = { val startTimeMs = System.currentTimeMillis() val tmp = " msg " + msg + " " - logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs)) + logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs)) try { - communicate(msg) - logDebug("Heartbeat sent successfully") - logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs)) - return true + val answer = askMaster(msg).asInstanceOf[Boolean] + logDebug("Block update sent successfully") + logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs)) + return Some(answer) } catch { case e: Exception => - logError("Failed in syncHeartBeat", e) - return false + logError("Failed in syncBlockUpdate", e) + return None } } -- cgit v1.2.3 From a2a94fdbc755ccf1bea4600a273f214a624b3a98 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Wed, 5 Dec 2012 23:00:59 -0800 Subject: Tests for block manager heartbeats. --- .../scala/spark/storage/BlockManagerSuite.scala | 68 ++++++++++++++++++++++ 1 file changed, 68 insertions(+) (limited to 'core') diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index b9c19e61cd..1491818140 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -14,10 +14,12 @@ import spark.util.ByteBufferInputStream class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { var store: BlockManager = null + var store2: BlockManager = null var actorSystem: ActorSystem = null var master: BlockManagerMaster = null var oldArch: String = null var oldOops: String = null + var oldHeartBeat: String = null // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer @@ -29,6 +31,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") oldOops = System.setProperty("spark.test.useCompressedOops", "true") + oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() } @@ -36,6 +39,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT after { if (store != null) { store.stop() + store = null + } + if (store2 != null) { + store2.stop() + store2 = null } actorSystem.shutdown() actorSystem.awaitTermination() @@ -85,6 +93,66 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") } + test("reregistration on heart beat") { + val heartBeat = PrivateMethod[Unit]('heartBeat) + store = new BlockManager(master, serializer, 2000) + val a1 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + + assert(store.getSingle("a1") != None, "a1 was not in store") + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + master.notifyADeadHost(store.blockManagerId.ip) + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + + store invokePrivate heartBeat() + assert(master.mustGetLocations(GetLocations("a1")).size > 0, + "a1 was not reregistered with master") + } + + test("reregistration on block update") { + store = new BlockManager(master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + master.notifyADeadHost(store.blockManagerId.ip) + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + + store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY) + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, + "a1 was not reregistered with master") + assert(master.mustGetLocations(GetLocations("a2")).size > 0, + "master was not told about a2") + } + + test("deregistration on duplicate") { + val heartBeat = PrivateMethod[Unit]('heartBeat) + store = new BlockManager(master, serializer, 2000) + val a1 = new Array[Byte](400) + + store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + store2 = new BlockManager(master, serializer, 2000) + + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") + + store invokePrivate heartBeat() + + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + + store2 invokePrivate heartBeat() + + assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master") + } + test("in-memory LRU storage") { store = new BlockManager(master, serializer, 1200) val a1 = new Array[Byte](400) -- cgit v1.2.3 From 7a033fd795b2008b1cdaa0d0aab73817db56d708 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 4 Dec 2012 13:58:12 -0800 Subject: Make LocalSparkCluster use distinct IPs --- core/src/main/scala/spark/deploy/LocalSparkCluster.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 8b2a71add5..4211d80596 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -35,11 +35,15 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) /* Start the Slaves */ for (slaveNum <- 1 to numSlaves) { + /* We can pretend to test distributed stuff by giving the slaves distinct hostnames. + All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is + sufficiently distinctive. */ + val slaveIpAddress = "127.100.0." + (slaveNum % 256) val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) + AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0) slaveActorSystems += actorSystem val actor = actorSystem.actorOf( - Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)), + Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)), name = "Worker") slaveActors += actor } -- cgit v1.2.3 From 8f0819520c8ca8a2d33b6d07f77f07e2df994aa8 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 6 Dec 2012 18:29:50 -0800 Subject: map -> foreach --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 1a0b477d92..4ab73a6c0d 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -225,7 +225,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor def removeHost(host: String) { logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.") logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq) - blockManagerIdByHost.get(host).map(removeBlockManager) + blockManagerIdByHost.get(host).foreach(removeBlockManager) logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq) sender ! true } -- cgit v1.2.3 From 714c8d32d56c64c259931dc15f41db959f667ee0 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 6 Dec 2012 18:38:34 -0800 Subject: Don't divide by milliseconds by 1000 more. --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 4ab73a6c0d..a5cdbae4da 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -308,7 +308,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor logInfo("Got Register Msg from master node, don't register it") } else { blockManagerInfo += (blockManagerId -> new BlockManagerInfo( - blockManagerId, System.currentTimeMillis() / 1000, maxMemSize)) + blockManagerId, System.currentTimeMillis(), maxMemSize)) } blockManagerIdByHost += (blockManagerId.ip -> blockManagerId) logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs)) -- cgit v1.2.3 From b53dd28c908580bf84f798eb39cf4449d6dab216 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Sun, 9 Dec 2012 23:03:34 -0800 Subject: Changed default block manager heartbeat interval to 5 s --- core/src/main/scala/spark/storage/BlockManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 4753f7f956..bb6fc34f5d 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -932,7 +932,7 @@ object BlockManager extends Logging { } def getHeartBeatFrequencyFromSystemProperties: Long = - System.getProperty("spark.storage.blockManagerHeartBeatMs", "2000").toLong + System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong def getDisableHeartBeatsForTesting: Boolean = System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean -- cgit v1.2.3 From 5d3e917d09241c783a0e826caae9b85cf5b044bf Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 10 Dec 2012 00:10:57 -0800 Subject: Use Akka scheduler for BlockManager heart beats. Adds required ActorSystem argument to BlockManager constructors. --- core/src/main/scala/spark/SparkEnv.scala | 2 +- .../main/scala/spark/storage/BlockManager.scala | 36 ++++++---------- .../main/scala/spark/storage/ThreadingTest.scala | 2 +- .../scala/spark/storage/BlockManagerSuite.scala | 50 +++++++++++----------- 4 files changed, 41 insertions(+), 49 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 9f2b0c42c7..272d7cdad3 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -88,7 +88,7 @@ object SparkEnv extends Logging { val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer") val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal) - val blockManager = new BlockManager(blockManagerMaster, serializer) + val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer) val connectionManager = blockManager.connectionManager diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index bb6fc34f5d..4e7d11996f 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -1,7 +1,9 @@ package spark.storage +import akka.actor.{ActorSystem, Cancellable} import akka.dispatch.{Await, Future} import akka.util.Duration +import akka.util.duration._ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream @@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConversions._ -import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils} +import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer import spark.util.ByteBufferInputStream @@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter } } - private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) private[spark] -class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) +class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, + val serializer: Serializer, maxMemory: Long) extends Logging { class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { @@ -116,28 +118,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - val heartBeatThread = new Thread("BlockManager heartbeat") { - setDaemon(true) - - override def run: Unit = { - while (!shuttingDown) { - heartBeat() - try { - Thread.sleep(heartBeatFrequency) - } catch { - case e: InterruptedException => {} - } - } - } - } + var heartBeatTask: Cancellable = null initialize() /** * Construct a BlockManager with a memory limit set based on system properties. */ - def this(master: BlockManagerMaster, serializer: Serializer) = { - this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties) + def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = { + this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties) } /** @@ -149,7 +138,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m RegisterBlockManager(blockManagerId, maxMemory)) BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting) { - heartBeatThread.start() + heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { + heartBeat() + } } } @@ -914,8 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } def stop() { - shuttingDown = true - heartBeatThread.interrupt() + if (heartBeatTask != null) { + heartBeatTask.cancel() + } connectionManager.stop() blockInfo.clear() memoryStore.clear() diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala index e4a5b8ffdf..5bb5a29cc4 100644 --- a/core/src/main/scala/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -74,7 +74,7 @@ private[spark] object ThreadingTest { val actorSystem = ActorSystem("test") val serializer = new KryoSerializer val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true) - val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024) + val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) producers.foreach(_.start) diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 1491818140..ad2253596d 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -64,7 +64,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("manager-master interaction") { - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -95,7 +95,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -112,7 +112,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("reregistration on block update") { - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -133,14 +133,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("deregistration on duplicate") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") - store2 = new BlockManager(master, serializer, 2000) + store2 = new BlockManager(actorSystem, master, serializer, 2000) assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master") @@ -154,7 +154,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -173,7 +173,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -192,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -211,7 +211,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -234,7 +234,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -247,7 +247,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -262,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -277,7 +277,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -292,7 +292,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -307,7 +307,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -332,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -356,7 +356,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - store = new BlockManager(master, serializer, 1200) + store = new BlockManager(actorSystem, master, serializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -402,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - store = new BlockManager(master, serializer, 500) + store = new BlockManager(actorSystem, master, serializer, 500) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) @@ -413,49 +413,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block compression") { try { System.setProperty("spark.shuffle.compress", "true") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed") store.stop() store = null System.setProperty("spark.shuffle.compress", "false") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "true") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed") store.stop() store = null System.setProperty("spark.broadcast.compress", "false") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "true") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed") store.stop() store = null System.setProperty("spark.rdd.compress", "false") - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed - store = new BlockManager(master, serializer, 2000) + store = new BlockManager(actorSystem, master, serializer, 2000) store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") store.stop() -- cgit v1.2.3 From b6b62d774f23bec64b027ecdc3d6daba85830d78 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 10 Dec 2012 00:27:13 -0800 Subject: Decrease BlockManagerMaster logging verbosity --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index a5cdbae4da..a7b60fc2cf 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -209,12 +209,13 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor } def expireDeadHosts() { - logInfo("Checking for hosts with no recent heart beats in BlockManagerMaster.") + logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.") val now = System.currentTimeMillis() val minSeenTime = now - slaveTimeout val toRemove = new HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { if (info.lastSeenMs < minSeenTime) { + logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") toRemove += info.blockManagerId } } -- cgit v1.2.3