aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/spark/deploy/LocalSparkCluster.scala8
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala90
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala168
-rw-r--r--core/src/main/scala/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala110
6 files changed, 318 insertions, 62 deletions
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 9f2b0c42c7..272d7cdad3 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -88,7 +88,7 @@ object SparkEnv extends Logging {
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
- val blockManager = new BlockManager(blockManagerMaster, serializer)
+ val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 8b2a71add5..4211d80596 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -35,11 +35,15 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
/* Start the Slaves */
for (slaveNum <- 1 to numSlaves) {
+ /* We can pretend to test distributed stuff by giving the slaves distinct hostnames.
+ All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
+ sufficiently distinctive. */
+ val slaveIpAddress = "127.100.0." + (slaveNum % 256)
val (actorSystem, boundPort) =
- AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
+ AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0)
slaveActorSystems += actorSystem
val actor = actorSystem.actorOf(
- Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
+ Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
slaveActors += actor
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index bf52b510b4..4e7d11996f 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1,7 +1,9 @@
package spark.storage
+import akka.actor.{ActorSystem, Cancellable}
import akka.dispatch.{Await, Future}
import akka.util.Duration
+import akka.util.duration._
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
@@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
-import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
+import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.ByteBufferInputStream
@@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
-
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark]
-class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
+class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
+ val serializer: Serializer, maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
@@ -104,15 +106,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
+ val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
+
val host = System.getProperty("spark.hostname", Utils.localHostName())
+ @volatile private var shuttingDown = false
+
+ private def heartBeat() {
+ if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
+ reregister()
+ }
+ }
+
+ var heartBeatTask: Cancellable = null
+
initialize()
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
- def this(master: BlockManagerMaster, serializer: Serializer) = {
- this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
+ def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
+ this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
@@ -123,6 +137,43 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
+ heartBeat()
+ }
+ }
+ }
+
+ /**
+ * Report all blocks to the BlockManager again. This may be necessary if we are dropped
+ * by the BlockManager and come back or if we become capable of recovering blocks on disk after
+ * an executor crash.
+ *
+ * This function deliberately fails silently if the master returns false (indicating that
+ * the slave needs to reregister). The error condition will be detected again by the next
+ * heart beat attempt or new block registration and another try to reregister all blocks
+ * will be made then.
+ */
+ private def reportAllBlocks() {
+ logInfo("Reporting " + blockInfo.size + " blocks to the master.")
+ for (blockId <- blockInfo.keys) {
+ if (!tryToReportBlockStatus(blockId)) {
+ logError("Failed to report " + blockId + " to master; giving up.")
+ return
+ }
+ }
+ }
+
+ /**
+ * Reregister with the master and report all blocks to it. This will be called by the heart beat
+ * thread if our heartbeat to the block amnager indicates that we were not registered.
+ */
+ def reregister() {
+ // TODO: We might need to rate limit reregistering.
+ logInfo("BlockManager reregistering with master")
+ master.mustRegisterBlockManager(
+ RegisterBlockManager(blockManagerId, maxMemory))
+ reportAllBlocks()
}
/**
@@ -134,12 +185,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
- * Tell the master about the current storage status of a block. This will send a heartbeat
+ * Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
+ val needReregister = !tryToReportBlockStatus(blockId)
+ if (needReregister) {
+ logInfo("Got told to reregister updating block " + blockId)
+ // Reregistering will report our new block for free.
+ reregister()
+ }
+ logDebug("Told master about block " + blockId)
+ }
+ /**
+ * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
+ * block was successfully recorded and false if the slave needs to reregister.
+ */
+ private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
case null =>
(StorageLevel.NONE, 0L, 0L)
@@ -159,10 +223,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
- master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
- logDebug("Told master about block " + blockId)
+ return master.mustBlockUpdate(
+ BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
}
+
/**
* Get locations of the block.
*/
@@ -840,6 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
+ if (heartBeatTask != null) {
+ heartBeatTask.cancel()
+ }
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()
@@ -855,6 +923,12 @@ object BlockManager extends Logging {
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
+ def getHeartBeatFrequencyFromSystemProperties: Long =
+ System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
+
+ def getDisableHeartBeatsForTesting: Boolean =
+ System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
+
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index ace27e758c..a7b60fc2cf 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -26,7 +26,10 @@ case class RegisterBlockManager(
extends ToBlockManagerMaster
private[spark]
-class HeartBeat(
+case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+private[spark]
+class BlockUpdate(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@@ -57,17 +60,17 @@ class HeartBeat(
}
private[spark]
-object HeartBeat {
+object BlockUpdate {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
- diskSize: Long): HeartBeat = {
- new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ diskSize: Long): BlockUpdate = {
+ new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
- def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
@@ -90,6 +93,9 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark]
case object GetMemoryStatus extends ToBlockManagerMaster
+private[spark]
+case object ExpireDeadHosts extends ToBlockManagerMaster
+
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
@@ -105,7 +111,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
def updateLastSeenMs() {
- _lastSeenMs = System.currentTimeMillis() / 1000
+ _lastSeenMs = System.currentTimeMillis()
}
def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
@@ -156,6 +162,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
def lastSeenMs: Long = _lastSeenMs
+ def blocks: JHashMap[String, StorageLevel] = _blocks
+
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
@@ -164,26 +172,84 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
+ private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
+ val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
+ "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
+
+ val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
+ "5000").toLong
+
+ var timeoutCheckingTask: Cancellable = null
+
+ override def preStart() {
+ if (!BlockManager.getDisableHeartBeatsForTesting) {
+ timeoutCheckingTask = context.system.scheduler.schedule(
+ 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
+ }
+ super.preStart()
+ }
+
+ def removeBlockManager(blockManagerId: BlockManagerId) {
+ val info = blockManagerInfo(blockManagerId)
+ blockManagerIdByHost.remove(blockManagerId.ip)
+ blockManagerInfo.remove(blockManagerId)
+ var iterator = info.blocks.keySet.iterator
+ while (iterator.hasNext) {
+ val blockId = iterator.next
+ val locations = blockInfo.get(blockId)._2
+ locations -= blockManagerId
+ if (locations.size == 0) {
+ blockInfo.remove(locations)
+ }
+ }
+ }
+
+ def expireDeadHosts() {
+ logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
+ val now = System.currentTimeMillis()
+ val minSeenTime = now - slaveTimeout
+ val toRemove = new HashSet[BlockManagerId]
+ for (info <- blockManagerInfo.values) {
+ if (info.lastSeenMs < minSeenTime) {
+ logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+ toRemove += info.blockManagerId
+ }
+ }
+ // TODO: Remove corresponding block infos
+ toRemove.foreach(removeBlockManager)
+ }
+
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
- val ip = host.split(":")(0)
- val port = host.split(":")(1)
- blockManagerInfo.remove(new BlockManagerId(ip, port.toInt))
+ blockManagerIdByHost.get(host).foreach(removeBlockManager)
logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
sender ! true
}
+ def heartBeat(blockManagerId: BlockManagerId) {
+ if (!blockManagerInfo.contains(blockManagerId)) {
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ sender ! true
+ } else {
+ sender ! false
+ }
+ } else {
+ blockManagerInfo(blockManagerId).updateLastSeenMs()
+ sender ! true
+ }
+ }
+
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize) =>
register(blockManagerId, maxMemSize)
- case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
@@ -205,8 +271,17 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
+ if (timeoutCheckingTask != null) {
+ timeoutCheckingTask.cancel
+ }
context.stop(self)
+ case ExpireDeadHosts =>
+ expireDeadHosts()
+
+ case HeartBeat(blockManagerId) =>
+ heartBeat(blockManagerId)
+
case other =>
logInfo("Got unknown message: " + other)
}
@@ -223,17 +298,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
+ if (blockManagerIdByHost.contains(blockManagerId.ip) &&
+ blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
+ val oldId = blockManagerIdByHost(blockManagerId.ip)
+ logInfo("Got second registration for host " + blockManagerId +
+ "; removing old slave " + oldId)
+ removeBlockManager(oldId)
+ }
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
- blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
+ blockManagerId, System.currentTimeMillis(), maxMemSize))
}
+ blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
- private def heartBeat(
+ private def blockUpdate(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@@ -244,15 +327,21 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val tmp = " " + blockManagerId + " " + blockId + " "
if (!blockManagerInfo.contains(blockManagerId)) {
- // Can happen if this is from a locally cached partition on the master
- sender ! true
+ if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
+ // We intentionally do not register the master (except in local mode),
+ // so we should not indicate failure.
+ sender ! true
+ } else {
+ sender ! false
+ }
return
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
+ return
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
@@ -361,7 +450,6 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
- val DEFAULT_MANAGER_PORT: String = "10902"
val timeout = 10.seconds
var masterActor: ActorRef = null
@@ -405,7 +493,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
def notifyADeadHost(host: String) {
- communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT))
+ communicate(RemoveHost(host))
logInfo("Removed " + host + " successfully in notifyADeadHost")
}
@@ -436,27 +524,49 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
}
- def mustHeartBeat(msg: HeartBeat) {
- while (! syncHeartBeat(msg)) {
- logWarning("Failed to send heartbeat" + msg)
+ def mustHeartBeat(msg: HeartBeat): Boolean = {
+ var res = syncHeartBeat(msg)
+ while (!res.isDefined) {
+ logWarning("Failed to send heart beat " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
+ return res.get
}
- def syncHeartBeat(msg: HeartBeat): Boolean = {
+ def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
+ try {
+ val answer = askMaster(msg).asInstanceOf[Boolean]
+ return Some(answer)
+ } catch {
+ case e: Exception =>
+ logError("Failed in syncHeartBeat", e)
+ return None
+ }
+ }
+
+ def mustBlockUpdate(msg: BlockUpdate): Boolean = {
+ var res = syncBlockUpdate(msg)
+ while (!res.isDefined) {
+ logWarning("Failed to send block update " + msg)
+ Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
+ }
+ return res.get
+ }
+
+ def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
- logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
+ logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
try {
- communicate(msg)
- logDebug("Heartbeat sent successfully")
- logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
- return true
+ val answer = askMaster(msg).asInstanceOf[Boolean]
+ logDebug("Block update sent successfully")
+ logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
+ return Some(answer)
} catch {
case e: Exception =>
- logError("Failed in syncHeartBeat", e)
- return false
+ logError("Failed in syncBlockUpdate", e)
+ return None
}
}
diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala
index e4a5b8ffdf..5bb5a29cc4 100644
--- a/core/src/main/scala/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/spark/storage/ThreadingTest.scala
@@ -74,7 +74,7 @@ private[spark] object ThreadingTest {
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
- val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
+ val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index b9c19e61cd..ad2253596d 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -14,10 +14,12 @@ import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
+ var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
var oldOops: String = null
+ var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer
@@ -29,6 +31,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
+ oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
@@ -36,6 +39,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
after {
if (store != null) {
store.stop()
+ store = null
+ }
+ if (store2 != null) {
+ store2.stop()
+ store2 = null
}
actorSystem.shutdown()
actorSystem.awaitTermination()
@@ -56,7 +64,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("manager-master interaction") {
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -85,8 +93,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
}
+ test("reregistration on heart beat") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+ store invokePrivate heartBeat()
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0,
+ "a1 was not reregistered with master")
+ }
+
+ test("reregistration on block update") {
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ master.notifyADeadHost(store.blockManagerId.ip)
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+ store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0,
+ "a1 was not reregistered with master")
+ assert(master.mustGetLocations(GetLocations("a2")).size > 0,
+ "master was not told about a2")
+ }
+
+ test("deregistration on duplicate") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ store2 = new BlockManager(actorSystem, master, serializer, 2000)
+
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
+
+ store invokePrivate heartBeat()
+
+ assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
+
+ store2 invokePrivate heartBeat()
+
+ assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
+ }
+
test("in-memory LRU storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -105,7 +173,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -124,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -143,7 +211,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -166,7 +234,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -179,7 +247,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -194,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -209,7 +277,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -224,7 +292,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -239,7 +307,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -264,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -288,7 +356,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager(master, serializer, 1200)
+ store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
@@ -334,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
- store = new BlockManager(master, serializer, 500)
+ store = new BlockManager(actorSystem, master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
@@ -345,49 +413,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
System.setProperty("spark.shuffle.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.shuffle.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "true")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "false")
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager(master, serializer, 2000)
+ store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()