diff options
author | Zhang, Liye <liye.zhang@intel.com> | 2014-12-05 12:00:32 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-05 12:00:32 -0800 |
commit | 98a7d09978eeb775600ff41f9cc6ae8622026b71 (patch) | |
tree | 352550e1e81deb04f524ae4aad84fb8221ab5eb4 /core | |
parent | 6f61e1f961826a6c9e98a66d10b271b7e3c7dd55 (diff) | |
download | spark-98a7d09978eeb775600ff41f9cc6ae8622026b71.tar.gz spark-98a7d09978eeb775600ff41f9cc6ae8622026b71.tar.bz2 spark-98a7d09978eeb775600ff41f9cc6ae8622026b71.zip |
[SPARK-4005][CORE] handle message replies in receive instead of in the individual private methods
In BlockManagermasterActor, when handling message type UpdateBlockInfo, the message replies is in handled in individual private methods, should handle it in receive of Akka.
Author: Zhang, Liye <liye.zhang@intel.com>
Closes #2853 from liyezhang556520/akkaRecv and squashes the following commits:
9b06f0a [Zhang, Liye] remove the unreachable code
bf518cd [Zhang, Liye] change the indent
242166b [Zhang, Liye] modified accroding to the comments
d4b929b [Zhang, Liye] [SPARK-4005][CORE] handle message replies in receive instead of in the individual private methods
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 685b2e1144..9cbda41223 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -73,9 +73,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case UpdateBlockInfo( blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) => - // TODO: Ideally we want to handle all the message replies in receive instead of in the - // individual private methods. - updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) + sender ! updateBlockInfo( + blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) case GetLocations(blockId) => sender ! getLocations(blockId) @@ -355,23 +354,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long) { + tachyonSize: Long): Boolean = { if (!blockManagerInfo.contains(blockManagerId)) { if (blockManagerId.isDriver && !isLocal) { // We intentionally do not register the master (except in local mode), // so we should not indicate failure. - sender ! true + return true } else { - sender ! false + return false } - return } if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() - sender ! true - return + return true } blockManagerInfo(blockManagerId).updateBlockInfo( @@ -395,7 +392,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus if (locations.size == 0) { blockLocations.remove(blockId) } - sender ! true + true } private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { |