aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-20 01:37:09 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-20 01:37:09 -0800
commit9397c5014e17a96c3cf24661c0edb40e524589e7 (patch)
treecc9dbe8129efcb42a2d589ce016e1fa6ae33003b /core
parent68c52d80ecd5dd173f755bedc813fdc1a52100aa (diff)
downloadspark-9397c5014e17a96c3cf24661c0edb40e524589e7.tar.gz
spark-9397c5014e17a96c3cf24661c0edb40e524589e7.tar.bz2
spark-9397c5014e17a96c3cf24661c0edb40e524589e7.zip
Let the slave notify the master block removal.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala65
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala17
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala34
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala59
4 files changed, 83 insertions, 92 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 682ea7baff..7a8ac10cdd 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -59,7 +59,7 @@ class BlockManager(
}
}
- private val blockInfo = new TimeStampedHashMap[String, BlockInfo]()
+ private val blockInfo = new TimeStampedHashMap[String, BlockInfo]
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@@ -139,8 +139,8 @@ class BlockManager(
*/
private def reportAllBlocks() {
logInfo("Reporting " + blockInfo.size + " blocks to the master.")
- for (blockId <- blockInfo.keys) {
- if (!tryToReportBlockStatus(blockId)) {
+ for ((blockId, info) <- blockInfo) {
+ if (!tryToReportBlockStatus(blockId, info)) {
logError("Failed to report " + blockId + " to master; giving up.")
return
}
@@ -168,8 +168,8 @@ class BlockManager(
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
- def reportBlockStatus(blockId: String) {
- val needReregister = !tryToReportBlockStatus(blockId)
+ def reportBlockStatus(blockId: String, info: BlockInfo) {
+ val needReregister = !tryToReportBlockStatus(blockId, info)
if (needReregister) {
logInfo("Got told to reregister updating block " + blockId)
// Reregistering will report our new block for free.
@@ -179,29 +179,23 @@ class BlockManager(
}
/**
- * Actually send a BlockUpdate message. Returns the mater's response, which will be true if the
- * block was successfully recorded and false if the slave needs to re-register.
+ * Actually send a UpdateBlockInfo message. Returns the mater's response,
+ * which will be true if the block was successfully recorded and false if
+ * the slave needs to re-register.
*/
- private def tryToReportBlockStatus(blockId: String): Boolean = {
- val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match {
- case None =>
- (StorageLevel.NONE, 0L, 0L, false)
- case Some(info) =>
- info.synchronized {
- info.level match {
- case null =>
- (StorageLevel.NONE, 0L, 0L, false)
- case level =>
- val inMem = level.useMemory && memoryStore.contains(blockId)
- val onDisk = level.useDisk && diskStore.contains(blockId)
- (
- new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
- if (inMem) memoryStore.getSize(blockId) else 0L,
- if (onDisk) diskStore.getSize(blockId) else 0L,
- info.tellMaster
- )
- }
- }
+ private def tryToReportBlockStatus(blockId: String, info: BlockInfo): Boolean = {
+ val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized {
+ info.level match {
+ case null =>
+ (StorageLevel.NONE, 0L, 0L, false)
+ case level =>
+ val inMem = level.useMemory && memoryStore.contains(blockId)
+ val onDisk = level.useDisk && diskStore.contains(blockId)
+ val storageLevel = new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
+ val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
+ val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
+ (storageLevel, memSize, diskSize, info.tellMaster)
+ }
}
if (tellMaster) {
@@ -648,7 +642,7 @@ class BlockManager(
// and tell the master about it.
myInfo.markReady(size)
if (tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, myInfo)
}
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -735,7 +729,7 @@ class BlockManager(
// and tell the master about it.
myInfo.markReady(bytes.limit)
if (tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, myInfo)
}
}
@@ -834,7 +828,7 @@ class BlockManager(
logWarning("Block " + blockId + " could not be dropped from memory as it does not exist")
}
if (info.tellMaster) {
- reportBlockStatus(blockId)
+ reportBlockStatus(blockId, info)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
@@ -847,9 +841,7 @@ class BlockManager(
}
/**
- * Remove a block from both memory and disk. This one doesn't report to the master
- * because it expects the master to initiate the original block removal command, and
- * then the master can update the block tracking itself.
+ * Remove a block from both memory and disk.
*/
def removeBlock(blockId: String) {
logInfo("Removing block " + blockId)
@@ -863,6 +855,9 @@ class BlockManager(
"the disk or memory store")
}
blockInfo.remove(blockId)
+ if (info.tellMaster) {
+ reportBlockStatus(blockId, info)
+ }
} else {
// The block has already been removed; do nothing.
logWarning("Asked to remove block " + blockId + ", which does not exist")
@@ -872,7 +867,7 @@ class BlockManager(
def dropOldBlocks(cleanupTime: Long) {
logInfo("Dropping blocks older than " + cleanupTime)
val iterator = blockInfo.internalMap.entrySet().iterator()
- while(iterator.hasNext) {
+ while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2)
if (time < cleanupTime) {
@@ -887,7 +882,7 @@ class BlockManager(
iterator.remove()
logInfo("Dropped block " + id)
}
- reportBlockStatus(id)
+ reportBlockStatus(id, info)
}
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index cb582633c4..a3d8671834 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -101,7 +101,7 @@ private[spark] class BlockManagerMaster(
* blocks that the master knows about.
*/
def removeBlock(blockId: String) {
- askMaster(RemoveBlock(blockId))
+ askMasterWithRetry(RemoveBlock(blockId))
}
/**
@@ -132,21 +132,6 @@ private[spark] class BlockManagerMaster(
/**
* Send a message to the master actor and get its result within a default timeout, or
- * throw a SparkException if this fails. There is no retry logic here so if the Akka
- * message is lost, the master actor won't get the command.
- */
- private def askMaster[T](message: Any): Any = {
- try {
- val future = masterActor.ask(message)(timeout)
- return Await.result(future, timeout).asInstanceOf[T]
- } catch {
- case e: Exception =>
- throw new SparkException("Error communicating with BlockManagerMaster", e)
- }
- }
-
- /**
- * Send a message to the master actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
private def askMasterWithRetry[T](message: Any): T = {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 0a1be98d83..f4d026da33 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -28,7 +28,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private val blockManagerIdByHost = new HashMap[String, ArrayBuffer[BlockManagerId]]
// Mapping from block id to the set of block managers that have the block.
- private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+ private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
@@ -53,7 +53,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
register(blockManagerId, maxMemSize, slaveActor)
case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
- blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
+ updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
@@ -108,10 +108,10 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
var iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
- val locations = blockInfo.get(blockId)._2
+ val locations = blockLocations.get(blockId)._2
locations -= blockManagerId
if (locations.size == 0) {
- blockInfo.remove(locations)
+ blockLocations.remove(locations)
}
}
}
@@ -154,7 +154,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlock(blockId: String) {
- val block = blockInfo.get(blockId)
+ val block = blockLocations.get(blockId)
if (block != null) {
block._2.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
@@ -163,11 +163,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Doesn't actually wait for a confirmation and the message might get lost.
// If message loss becomes frequent, we should add retry logic here.
blockManager.get.slaveActor ! RemoveBlock(blockId)
- // Remove the block from the master's BlockManagerInfo.
- blockManager.get.updateBlockInfo(blockId, StorageLevel.NONE, 0, 0)
}
}
- blockInfo.remove(blockId)
}
sender ! true
}
@@ -202,7 +199,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! true
}
- private def blockUpdate(
+ private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@@ -232,21 +229,22 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: HashSet[BlockManagerId] = null
- if (blockInfo.containsKey(blockId)) {
- locations = blockInfo.get(blockId)._2
+ if (blockLocations.containsKey(blockId)) {
+ locations = blockLocations.get(blockId)._2
} else {
locations = new HashSet[BlockManagerId]
- blockInfo.put(blockId, (storageLevel.replication, locations))
+ blockLocations.put(blockId, (storageLevel.replication, locations))
}
if (storageLevel.isValid) {
- locations += blockManagerId
+ locations.add(blockManagerId)
} else {
locations.remove(blockManagerId)
}
+ // Remove the block from master tracking if it has been removed on all slaves.
if (locations.size == 0) {
- blockInfo.remove(blockId)
+ blockLocations.remove(blockId)
}
sender ! true
}
@@ -254,9 +252,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def getLocations(blockId: String) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " "
- if (blockInfo.containsKey(blockId)) {
+ if (blockLocations.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
+ res.appendAll(blockLocations.get(blockId)._2)
sender ! res.toSeq
} else {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
@@ -267,9 +265,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId
- if (blockInfo.containsKey(blockId)) {
+ if (blockLocations.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockInfo.get(blockId)._2)
+ res.appendAll(blockLocations.get(blockId)._2)
return res.toSeq
} else {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 4e28a7e2bc..8f86e3170e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -7,6 +7,10 @@ import akka.actor._
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers._
+import org.scalatest.time.SpanSugar._
import spark.KryoSerializer
import spark.SizeEstimator
@@ -142,37 +146,46 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
- // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
- store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
+ // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+ store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
// Checking whether blocks are in memory and memory size
- var memStatus = master.getMemoryStatus.head._2
+ val memStatus = master.getMemoryStatus.head._2
assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
- assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
+ assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
+ assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
- assert(master.getLocations("a1").size > 0, "master was not told about a1")
- assert(master.getLocations("a2").size > 0, "master was not told about a2")
- assert(master.getLocations("a3").size === 0, "master was told about a3")
+ assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
// Remove a1 and a2 and a3. Should be no-op for a3.
- master.removeBlock("a1")
- master.removeBlock("a2")
- master.removeBlock("a3")
- assert(store.getSingle("a1") === None, "a1 not removed from store")
- assert(store.getSingle("a2") === None, "a2 not removed from store")
- assert(master.getLocations("a1").size === 0, "master did not remove a1")
- assert(master.getLocations("a2").size === 0, "master did not remove a2")
- assert(store.getSingle("a3") != None, "a3 was not in store")
- assert(master.getLocations("a3").size === 0, "master was told about a3")
- memStatus = master.getMemoryStatus.head._2
- assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
- assert(memStatus._2 == 2000L, "remaining memory " + memStatus._1 + " should equal 2000")
+ master.removeBlock("a1-to-remove")
+ master.removeBlock("a2-to-remove")
+ master.removeBlock("a3-to-remove")
+
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a1-to-remove") should be (None)
+ master.getLocations("a1-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a2-to-remove") should be (None)
+ master.getLocations("a2-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a3-to-remove") should not be (None)
+ master.getLocations("a3-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ val memStatus = master.getMemoryStatus.head._2
+ memStatus._1 should equal (2000L)
+ memStatus._2 should equal (2000L)
+ }
}
test("reregistration on heart beat") {