aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala88
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala130
2 files changed, 193 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bf52b510b4..4753f7f956 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -104,8 +104,33 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+ val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ @volatile private var shuttingDown = false
+
+ private def heartBeat() {
+ if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+ reregister()
+ }
+ }
+
+ val heartBeatThread = new Thread("BlockManager heartbeat") {
+ setDaemon(true)
+
+ override def run: Unit = {
+ while (!shuttingDown) {
+ heartBeat()
+ try {
+ Thread.sleep(heartBeatFrequency)
+ } catch {
+ case e: InterruptedException => {}
+ }
+ }
+ }
+ }
+
initialize()
/**
@@ -123,6 +148,41 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ heartBeatThread.start()
+ }
+ }
+
+ /**
+ * Report all blocks to the BlockManager again. This may be necessary if we are dropped
+ * by the BlockManager and come back or if we become capable of recovering blocks on disk after
+ * an executor crash.
+ *
+ * This function deliberately fails silently if the master returns false (indicating that
+ * the slave needs to reregister). The error condition will be detected again by the next
+ * heart beat attempt or new block registration and another try to reregister all blocks
+ * will be made then.
+ */
+ private def reportAllBlocks() {
+ logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+ for (blockId <- blockInfo.keys) {
+ if (!tryToReportBlockStatus(blockId)) {
+ logError("Failed to report " + blockId + " to master; giving up.")
+ return
+ }
+ }
+ }
+
+ /**
+ * Reregister with the master and report all blocks to it. This will be called by the heart beat
+ * thread if our heartbeat to the block amnager indicates that we were not registered.
+ */
+ def reregister() {
+ // TODO: We might need to rate limit reregistering.
+ logInfo("BlockManager reregistering with master")
+ master.mustRegisterBlockManager(
+ RegisterBlockManager(blockManagerId, maxMemory))
+ reportAllBlocks()
}
/**
@@ -134,12 +194,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Tell the master about the current storage status of a block. This will send a heartbeat
+ * Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
+ val needReregister = !tryToReportBlockStatus(blockId)
+ if (needReregister) {
+ logInfo("Got told to reregister updating block " + blockId)
+ // Reregistering will report our new block for free.
+ reregister()
+ }
+ logDebug("Told master about block " + blockId)
+ }
+ /**
+ * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
+ * block was successfully recorded and false if the slave needs to reregister.
+ */
+ private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
case null =>
(StorageLevel.NONE, 0L, 0L)
@@ -159,10 +232,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
- master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
- logDebug("Told master about block " + blockId)
+ return master.mustBlockUpdate(
+ BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
}
+
/**
* Get locations of the block.
*/
@@ -840,6 +914,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
+ shuttingDown = true
+ heartBeatThread.interrupt()
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()
@@ -855,6 +931,12 @@ object BlockManager extends Logging {
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
+ def getHeartBeatFrequencyFromSystemProperties: Long =
+ System.getProperty("spark.storage.blockManagerHeartBeatMs", "2000").toLong
+
+ def getDisableHeartBeatsForTesting: Boolean =
+ System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 4959c05f94..1a0b477d92 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -26,7 +26,10 @@ case class RegisterBlockManager(
extends ToBlockManagerMaster
private[spark]
-class HeartBeat(
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class BlockUpdate(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@@ -57,17 +60,17 @@ class HeartBeat(
}
private[spark]
-object HeartBeat {
+object BlockUpdate {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): HeartBeat = {
- new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ diskSize: Long): BlockUpdate = {
+ new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
- def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
@@ -90,6 +93,9 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark]
case object GetMemoryStatus extends ToBlockManagerMaster
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
+
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
@@ -171,6 +177,22 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
initLogging()
+ val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+ "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+ val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ "5000").toLong
+
+ var timeoutCheckingTask: Cancellable = null
+
+ override def preStart() {
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ timeoutCheckingTask = context.system.scheduler.schedule(
+ 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ }
+ super.preStart()
+ }
+
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByHost.remove(blockManagerId.ip)
@@ -186,6 +208,20 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
+ def expireDeadHosts() {
+ logInfo("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+ val now = System.currentTimeMillis()
+ val minSeenTime = now - slaveTimeout
+ val toRemove = new HashSet[BlockManagerId]
+ for (info <- blockManagerInfo.values) {
+ if (info.lastSeenMs < minSeenTime) {
+ toRemove += info.blockManagerId
+ }
+ }
+ // TODO: Remove corresponding block infos
+ toRemove.foreach(removeBlockManager)
+ }
+
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
@@ -194,12 +230,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
}
+ def heartBeat(blockManagerId: BlockManagerId) {
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ sender ! true
+ } else {
+ sender ! false
+ }
+ } else {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ }
+ }
+
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize) =>
register(blockManagerId, maxMemSize)
- case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
@@ -221,8 +270,17 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel
+ }
context.stop(self)
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+
+ case HeartBeat(blockManagerId) =>
+ heartBeat(blockManagerId)
+
case other =>
logInfo("Got unknown message: " + other)
}
@@ -257,7 +315,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender ! true
}
- private def heartBeat(
+ private def blockUpdate(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@@ -268,15 +326,21 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val tmp = " " + blockManagerId + " " + blockId + " "
if (!blockManagerInfo.contains(blockManagerId)) {
- // Can happen if this is from a locally cached partition on the master
- sender ! true
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ // We intentionally do not register the master (except in local mode),
+ // so we should not indicate failure.
+ sender ! true
+ } else {
+ sender ! false
+ }
return
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
+ return
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
@@ -459,27 +523,49 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
}
- def mustHeartBeat(msg: HeartBeat) {
- while (! syncHeartBeat(msg)) {
- logWarning("Failed to send heartbeat" + msg)
+ def mustHeartBeat(msg: HeartBeat): Boolean = {
+ var res = syncHeartBeat(msg)
+ while (!res.isDefined) {
+ logWarning("Failed to send heart beat " + msg)
+ Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
+ }
+ return res.get
+ }
+
+ def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
+ try {
+ val answer = askMaster(msg).asInstanceOf[Boolean]
+ return Some(answer)
+ } catch {
+ case e: Exception =>
+ logError("Failed in syncHeartBeat", e)
+ return None
+ }
+ }
+
+ def mustBlockUpdate(msg: BlockUpdate): Boolean = {
+ var res = syncBlockUpdate(msg)
+ while (!res.isDefined) {
+ logWarning("Failed to send block update " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
+ return res.get
}
- def syncHeartBeat(msg: HeartBeat): Boolean = {
+ def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
- logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
try {
- communicate(msg)
- logDebug("Heartbeat sent successfully")
- logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
- return true
+ val answer = askMaster(msg).asInstanceOf[Boolean]
+ logDebug("Block update sent successfully")
+ logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
+ return Some(answer)
} catch {
case e: Exception =>
- logError("Failed in syncHeartBeat", e)
- return false
+ logError("Failed in syncBlockUpdate", e)
+ return None
}
}