diff options
Diffstat (limited to 'core/src/main/scala/spark/storage/BlockManager.scala')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 90 |
1 files changed, 82 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index bf52b510b4..4e7d11996f 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -1,7 +1,9 @@ package spark.storage +import akka.actor.{ActorSystem, Cancellable} import akka.dispatch.{Await, Future} import akka.util.Duration +import akka.util.duration._ import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream @@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConversions._ -import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils} +import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer import spark.util.ByteBufferInputStream @@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter } } - private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) private[spark] -class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) +class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, + val serializer: Serializer, maxMemory: Long) extends Logging { class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { @@ -104,15 +106,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Whether to compress RDD partitions that are stored serialized val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean + val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties + val host = System.getProperty("spark.hostname", Utils.localHostName()) + @volatile private var shuttingDown = false + + private def heartBeat() { + if (!master.mustHeartBeat(HeartBeat(blockManagerId))) { + reregister() + } + } + + var heartBeatTask: Cancellable = null + initialize() /** * Construct a BlockManager with a memory limit set based on system properties. */ - def this(master: BlockManagerMaster, serializer: Serializer) = { - this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties) + def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = { + this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties) } /** @@ -123,6 +137,43 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m master.mustRegisterBlockManager( RegisterBlockManager(blockManagerId, maxMemory)) BlockManagerWorker.startBlockManagerWorker(this) + if (!BlockManager.getDisableHeartBeatsForTesting) { + heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { + heartBeat() + } + } + } + + /** + * Report all blocks to the BlockManager again. This may be necessary if we are dropped + * by the BlockManager and come back or if we become capable of recovering blocks on disk after + * an executor crash. + * + * This function deliberately fails silently if the master returns false (indicating that + * the slave needs to reregister). The error condition will be detected again by the next + * heart beat attempt or new block registration and another try to reregister all blocks + * will be made then. + */ + private def reportAllBlocks() { + logInfo("Reporting " + blockInfo.size + " blocks to the master.") + for (blockId <- blockInfo.keys) { + if (!tryToReportBlockStatus(blockId)) { + logError("Failed to report " + blockId + " to master; giving up.") + return + } + } + } + + /** + * Reregister with the master and report all blocks to it. This will be called by the heart beat + * thread if our heartbeat to the block amnager indicates that we were not registered. + */ + def reregister() { + // TODO: We might need to rate limit reregistering. + logInfo("BlockManager reregistering with master") + master.mustRegisterBlockManager( + RegisterBlockManager(blockManagerId, maxMemory)) + reportAllBlocks() } /** @@ -134,12 +185,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Tell the master about the current storage status of a block. This will send a heartbeat + * Tell the master about the current storage status of a block. This will send a block update * message reflecting the current status, *not* the desired storage level in its block info. * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. */ def reportBlockStatus(blockId: String) { + val needReregister = !tryToReportBlockStatus(blockId) + if (needReregister) { + logInfo("Got told to reregister updating block " + blockId) + // Reregistering will report our new block for free. + reregister() + } + logDebug("Told master about block " + blockId) + } + /** + * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo + * block was successfully recorded and false if the slave needs to reregister. + */ + private def tryToReportBlockStatus(blockId: String): Boolean = { val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match { case null => (StorageLevel.NONE, 0L, 0L) @@ -159,10 +223,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } } - master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) - logDebug("Told master about block " + blockId) + return master.mustBlockUpdate( + BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) } + /** * Get locations of the block. */ @@ -840,6 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } def stop() { + if (heartBeatTask != null) { + heartBeatTask.cancel() + } connectionManager.stop() blockInfo.clear() memoryStore.clear() @@ -855,6 +923,12 @@ object BlockManager extends Logging { (Runtime.getRuntime.maxMemory * memoryFraction).toLong } + def getHeartBeatFrequencyFromSystemProperties: Long = + System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong + + def getDisableHeartBeatsForTesting: Boolean = + System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean + /** * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that * might cause errors if one attempts to read from the unmapped buffer, but it's better than |