aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/storage/BlockManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/storage/BlockManager.scala')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala90
1 files changed, 82 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bf52b510b4..4e7d11996f 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,7 +1,9 @@
package spark.storage
+import akka.actor.{ActorSystem, Cancellable}
import akka.dispatch.{Await, Future}
import akka.util.Duration
+import akka.util.duration._
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
@@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
-import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
+import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.ByteBufferInputStream
@@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
-
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark]
-class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
+ val serializer: Serializer, maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -104,15 +106,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+ val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ @volatile private var shuttingDown = false
+
+ private def heartBeat() {
+ if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+ reregister()
+ }
+ }
+
+ var heartBeatTask: Cancellable = null
+
initialize()
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(master: BlockManagerMaster, serializer: Serializer) = {
- this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
+ this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
@@ -123,6 +137,43 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
+ heartBeat()
+ }
+ }
+ }
+
+ /**
+ * Report all blocks to the BlockManager again. This may be necessary if we are dropped
+ * by the BlockManager and come back or if we become capable of recovering blocks on disk after
+ * an executor crash.
+ *
+ * This function deliberately fails silently if the master returns false (indicating that
+ * the slave needs to reregister). The error condition will be detected again by the next
+ * heart beat attempt or new block registration and another try to reregister all blocks
+ * will be made then.
+ */
+ private def reportAllBlocks() {
+ logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+ for (blockId <- blockInfo.keys) {
+ if (!tryToReportBlockStatus(blockId)) {
+ logError("Failed to report " + blockId + " to master; giving up.")
+ return
+ }
+ }
+ }
+
+ /**
+ * Reregister with the master and report all blocks to it. This will be called by the heart beat
+ * thread if our heartbeat to the block amnager indicates that we were not registered.
+ */
+ def reregister() {
+ // TODO: We might need to rate limit reregistering.
+ logInfo("BlockManager reregistering with master")
+ master.mustRegisterBlockManager(
+ RegisterBlockManager(blockManagerId, maxMemory))
+ reportAllBlocks()
}
/**
@@ -134,12 +185,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Tell the master about the current storage status of a block. This will send a heartbeat
+ * Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
+ val needReregister = !tryToReportBlockStatus(blockId)
+ if (needReregister) {
+ logInfo("Got told to reregister updating block " + blockId)
+ // Reregistering will report our new block for free.
+ reregister()
+ }
+ logDebug("Told master about block " + blockId)
+ }
+ /**
+ * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
+ * block was successfully recorded and false if the slave needs to reregister.
+ */
+ private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
case null =>
(StorageLevel.NONE, 0L, 0L)
@@ -159,10 +223,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
- master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
- logDebug("Told master about block " + blockId)
+ return master.mustBlockUpdate(
+ BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
}
+
/**
* Get locations of the block.
*/
@@ -840,6 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
+ if (heartBeatTask != null) {
+ heartBeatTask.cancel()
+ }
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()
@@ -855,6 +923,12 @@ object BlockManager extends Logging {
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
+ def getHeartBeatFrequencyFromSystemProperties: Long =
+ System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
+
+ def getDisableHeartBeatsForTesting: Boolean =
+ System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than