aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2012-12-05 23:12:33 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2012-12-05 23:35:20 -0800
commitd21ca010ac14890065e559bab80f56830bb533a7 (patch)
treee8c0c4c9c0b3af4f2c9100f5700a23d104dc1691 /core
parentc9e54a6755961a5cc9eda45df6a2e5e2df1b01a6 (diff)
downloadspark-d21ca010ac14890065e559bab80f56830bb533a7.tar.gz
spark-d21ca010ac14890065e559bab80f56830bb533a7.tar.bz2
spark-d21ca010ac14890065e559bab80f56830bb533a7.zip
Add block manager heart beats.
Renames old message called 'HeartBeat' to 'BlockUpdate'. The BlockManager periodically sends a heart beat message to the master. If the manager is currently not registered. The master responds to the heart beat by indicating whether the BlockManager is currently registered with the master. Additionally, the master now also responds to block updates by indicating whether the BlockManager in question is registered. When the BlockManager detects (by heart beat or failed block update) that it stopped being registered, it reregisters and sends block updates for all its blocks.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala88
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala130
2 files changed, 193 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bf52b510b4..4753f7f956 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -104,8 +104,33 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+ val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ @volatile private var shuttingDown = false
+
+ private def heartBeat() {
+ if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+ reregister()
+ }
+ }
+
+ val heartBeatThread = new Thread("BlockManager heartbeat") {
+ setDaemon(true)
+
+ override def run: Unit = {
+ while (!shuttingDown) {
+ heartBeat()
+ try {
+ Thread.sleep(heartBeatFrequency)
+ } catch {
+ case e: InterruptedException => {}
+ }
+ }
+ }
+ }
+
initialize()
/**
@@ -123,6 +148,41 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ heartBeatThread.start()
+ }
+ }
+
+ /**
+ * Report all blocks to the BlockManager again. This may be necessary if we are dropped
+ * by the BlockManager and come back or if we become capable of recovering blocks on disk after
+ * an executor crash.
+ *
+ * This function deliberately fails silently if the master returns false (indicating that
+ * the slave needs to reregister). The error condition will be detected again by the next
+ * heart beat attempt or new block registration and another try to reregister all blocks
+ * will be made then.
+ */
+ private def reportAllBlocks() {
+ logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+ for (blockId <- blockInfo.keys) {
+ if (!tryToReportBlockStatus(blockId)) {
+ logError("Failed to report " + blockId + " to master; giving up.")
+ return
+ }
+ }
+ }
+
+ /**
+ * Reregister with the master and report all blocks to it. This will be called by the heart beat
+ * thread if our heartbeat to the block amnager indicates that we were not registered.
+ */
+ def reregister() {
+ // TODO: We might need to rate limit reregistering.
+ logInfo("BlockManager reregistering with master")
+ master.mustRegisterBlockManager(
+ RegisterBlockManager(blockManagerId, maxMemory))
+ reportAllBlocks()
}
/**
@@ -134,12 +194,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Tell the master about the current storage status of a block. This will send a heartbeat
+ * Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
+ val needReregister = !tryToReportBlockStatus(blockId)
+ if (needReregister) {
+ logInfo("Got told to reregister updating block " + blockId)
+ // Reregistering will report our new block for free.
+ reregister()
+ }
+ logDebug("Told master about block " + blockId)
+ }
+ /**
+ * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
+ * block was successfully recorded and false if the slave needs to reregister.
+ */
+ private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
case null =>
(StorageLevel.NONE, 0L, 0L)
@@ -159,10 +232,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
- master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
- logDebug("Told master about block " + blockId)
+ return master.mustBlockUpdate(
+ BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
}
+
/**
* Get locations of the block.
*/
@@ -840,6 +914,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
+ shuttingDown = true
+ heartBeatThread.interrupt()
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()
@@ -855,6 +931,12 @@ object BlockManager extends Logging {
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
+ def getHeartBeatFrequencyFromSystemProperties: Long =
+ System.getProperty("spark.storage.blockManagerHeartBeatMs", "2000").toLong
+
+ def getDisableHeartBeatsForTesting: Boolean =
+ System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 4959c05f94..1a0b477d92 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -26,7 +26,10 @@ case class RegisterBlockManager(
extends ToBlockManagerMaster
private[spark]
-class HeartBeat(
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class BlockUpdate(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@@ -57,17 +60,17 @@ class HeartBeat(
}
private[spark]
-object HeartBeat {
+object BlockUpdate {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): HeartBeat = {
- new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ diskSize: Long): BlockUpdate = {
+ new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
- def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
@@ -90,6 +93,9 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark]
case object GetMemoryStatus extends ToBlockManagerMaster
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
+
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
@@ -171,6 +177,22 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
initLogging()
+ val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+ "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+ val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ "5000").toLong
+
+ var timeoutCheckingTask: Cancellable = null
+
+ override def preStart() {
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ timeoutCheckingTask = context.system.scheduler.schedule(
+ 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ }
+ super.preStart()
+ }
+
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByHost.remove(blockManagerId.ip)
@@ -186,6 +208,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
+ def expireDeadHosts() {
+ logInfo("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+ val now = System.currentTimeMillis()
+ val minSeenTime = now - slaveTimeout
+ val toRemove = new HashSet[BlockManagerId]
+ for (info <- blockManagerInfo.values) {
+ if (info.lastSeenMs < minSeenTime) {
+ toRemove += info.blockManagerId
+ }
+ }
+ // TODO: Remove corresponding block infos
+ toRemove.foreach(removeBlockManager)
+ }
+
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
@@ -194,12 +230,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
}
+ def heartBeat(blockManagerId: BlockManagerId) {
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ sender ! true
+ } else {
+ sender ! false
+ }
+ } else {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ }
+ }
+
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize) =>
register(blockManagerId, maxMemSize)
- case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
@@ -221,8 +270,17 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel
+ }
context.stop(self)
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+
+ case HeartBeat(blockManagerId) =>
+ heartBeat(blockManagerId)
+
case other =>
logInfo("Got unknown message: " + other)
}
@@ -257,7 +315,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
}
- private def heartBeat(
+ private def blockUpdate(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@@ -268,15 +326,21 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val tmp = " " + blockManagerId + " " + blockId + " "
if (!blockManagerInfo.contains(blockManagerId)) {
- // Can happen if this is from a locally cached partition on the master
- sender ! true
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ // We intentionally do not register the master (except in local mode),
+ // so we should not indicate failure.
+ sender ! true
+ } else {
+ sender ! false
+ }
return
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
+ return
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
@@ -459,27 +523,49 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
}
- def mustHeartBeat(msg: HeartBeat) {
- while (! syncHeartBeat(msg)) {
- logWarning("Failed to send heartbeat" + msg)
+ def mustHeartBeat(msg: HeartBeat): Boolean = {
+ var res = syncHeartBeat(msg)
+ while (!res.isDefined) {
+ logWarning("Failed to send heart beat " + msg)
+ Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
+ }
+ return res.get
+ }
+
+ def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
+ try {
+ val answer = askMaster(msg).asInstanceOf[Boolean]
+ return Some(answer)
+ } catch {
+ case e: Exception =>
+ logError("Failed in syncHeartBeat", e)
+ return None
+ }
+ }
+
+ def mustBlockUpdate(msg: BlockUpdate): Boolean = {
+ var res = syncBlockUpdate(msg)
+ while (!res.isDefined) {
+ logWarning("Failed to send block update " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
+ return res.get
}
- def syncHeartBeat(msg: HeartBeat): Boolean = {
+ def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
- logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
try {
- communicate(msg)
- logDebug("Heartbeat sent successfully")
- logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
- return true
+ val answer = askMaster(msg).asInstanceOf[Boolean]
+ logDebug("Block update sent successfully")
+ logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
+ return Some(answer)
} catch {
case e: Exception =>
- logError("Failed in syncHeartBeat", e)
- return false
+ logError("Failed in syncBlockUpdate", e)
+ return None
}
}