aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-12-10 20:36:03 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2012-12-10 20:36:03 -0800
commit21b271f5bdfca63a9925c578c8e53bee1890adeb (patch)
tree7ea6b5ec91468abe059f2ed25d4f6a9d3d2645e1 /core
parentc10b229992ac5f3a14e86d14f2e6c11d9004e5e2 (diff)
downloadspark-21b271f5bdfca63a9925c578c8e53bee1890adeb.tar.gz
spark-21b271f5bdfca63a9925c578c8e53bee1890adeb.tar.bz2
spark-21b271f5bdfca63a9925c578c8e53bee1890adeb.zip
Suppress shuffle block updates when a slave node comes back.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala23
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala6
2 files changed, 17 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 4e7d11996f..df295b1820 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -47,7 +47,7 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
-private[spark]
+private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
@@ -200,31 +200,36 @@ class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
}
/**
- * Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
- * block was successfully recorded and false if the slave needs to reregister.
+ * Actually send a BlockUpdate message. Returns the mater's response, which will be true if the
+ * block was successfully recorded and false if the slave needs to re-register.
*/
private def tryToReportBlockStatus(blockId: String): Boolean = {
- val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
+ val (curLevel, inMemSize, onDiskSize, tellMaster) = blockInfo.get(blockId) match {
case null =>
- (StorageLevel.NONE, 0L, 0L)
+ (StorageLevel.NONE, 0L, 0L, false)
case info =>
info.synchronized {
info.level match {
case null =>
- (StorageLevel.NONE, 0L, 0L)
+ (StorageLevel.NONE, 0L, 0L, false)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
(
new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
if (inMem) memoryStore.getSize(blockId) else 0L,
- if (onDisk) diskStore.getSize(blockId) else 0L
+ if (onDisk) diskStore.getSize(blockId) else 0L,
+ info.tellMaster
)
}
}
}
- return master.mustBlockUpdate(
- BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+
+ if (tellMaster) {
+ master.mustBlockUpdate(BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
+ } else {
+ true
+ }
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index a7b60fc2cf..0a4e68f437 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -215,7 +215,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
- logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
toRemove += info.blockManagerId
}
}
@@ -279,7 +279,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case ExpireDeadHosts =>
expireDeadHosts()
- case HeartBeat(blockManagerId) =>
+ case HeartBeat(blockManagerId) =>
heartBeat(blockManagerId)
case other =>
@@ -538,7 +538,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
val answer = askMaster(msg).asInstanceOf[Boolean]
return Some(answer)
} catch {
- case e: Exception =>
+ case e: Exception =>
logError("Failed in syncHeartBeat", e)
return None
}