aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorZhang, Liye <liye.zhang@intel.com>2014-12-05 12:00:32 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-05 12:00:32 -0800
commit98a7d09978eeb775600ff41f9cc6ae8622026b71 (patch)
tree352550e1e81deb04f524ae4aad84fb8221ab5eb4 /core
parent6f61e1f961826a6c9e98a66d10b271b7e3c7dd55 (diff)
downloadspark-98a7d09978eeb775600ff41f9cc6ae8622026b71.tar.gz
spark-98a7d09978eeb775600ff41f9cc6ae8622026b71.tar.bz2
spark-98a7d09978eeb775600ff41f9cc6ae8622026b71.zip
[SPARK-4005][CORE] handle message replies in receive instead of in the individual private methods
In BlockManagermasterActor, when handling message type UpdateBlockInfo, the message replies is in handled in individual private methods, should handle it in receive of Akka. Author: Zhang, Liye <liye.zhang@intel.com> Closes #2853 from liyezhang556520/akkaRecv and squashes the following commits: 9b06f0a [Zhang, Liye] remove the unreachable code bf518cd [Zhang, Liye] change the indent 242166b [Zhang, Liye] modified accroding to the comments d4b929b [Zhang, Liye] [SPARK-4005][CORE] handle message replies in receive instead of in the individual private methods
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala17
1 files changed, 7 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 685b2e1144..9cbda41223 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -73,9 +73,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
- // TODO: Ideally we want to handle all the message replies in receive instead of in the
- // individual private methods.
- updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)
+ sender ! updateBlockInfo(
+ blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)
case GetLocations(blockId) =>
sender ! getLocations(blockId)
@@ -355,23 +354,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long) {
+ tachyonSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
- sender ! true
+ return true
} else {
- sender ! false
+ return false
}
- return
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- sender ! true
- return
+ return true
}
blockManagerInfo(blockManagerId).updateBlockInfo(
@@ -395,7 +392,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
if (locations.size == 0) {
blockLocations.remove(blockId)
}
- sender ! true
+ true
}
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {