From 21b271f5bdfca63a9925c578c8e53bee1890adeb Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 10 Dec 2012 20:36:03 -0800 Subject: Suppress shuffle block updates when a slave node comes back. --- .../main/scala/spark/storage/BlockManager.scala | 23 +++++++++++++--------- .../scala/spark/storage/BlockManagerMaster.scala | 6 +++--- 2 files changed, 17 insertions(+), 12 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 4e7d11996f..df295b1820 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -47,7 +47,7 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter } } -private[spark] +private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) @@ -200,31 +200,36 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster, } /** - * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo - * block was successfully recorded and false if the slave needs to reregister. + * Actually send a BlockUpdate message. Returns the mater's response, which will be true if the + * block was successfully recorded and false if the slave needs to re-register. */ private def tryToReportBlockStatus(blockId: String): Boolean = { - val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match { + val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match { case null => - (StorageLevel.NONE, 0L, 0L) + (StorageLevel.NONE, 0L, 0L, false) case info => info.synchronized { info.level match { case null => - (StorageLevel.NONE, 0L, 0L) + (StorageLevel.NONE, 0L, 0L, false) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) ( new StorageLevel(onDisk, inMem, level.deserialized, level.replication), if (inMem) memoryStore.getSize(blockId) else 0L, - if (onDisk) diskStore.getSize(blockId) else 0L + if (onDisk) diskStore.getSize(blockId) else 0L, + info.tellMaster ) } } } - return master.mustBlockUpdate( - BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) + + if (tellMaster) { + master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) + } else { + true + } } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index a7b60fc2cf..0a4e68f437 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -215,7 +215,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val toRemove = new HashSet[BlockManagerId] for (info <- blockManagerInfo.values) { if (info.lastSeenMs < minSeenTime) { - logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") + logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats") toRemove += info.blockManagerId } } @@ -279,7 +279,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor case ExpireDeadHosts => expireDeadHosts() - case HeartBeat(blockManagerId) => + case HeartBeat(blockManagerId) => heartBeat(blockManagerId) case other => @@ -538,7 +538,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool val answer = askMaster(msg).asInstanceOf[Boolean] return Some(answer) } catch { - case e: Exception => + case e: Exception => logError("Failed in syncHeartBeat", e) return None } -- cgit v1.2.3