diff options
author | Imran Rashid <imran@quantifind.com> | 2013-01-22 09:27:33 -0800 |
---|---|---|
committer | Imran Rashid <imran@quantifind.com> | 2013-01-22 11:46:01 -0800 |
commit | 50e2b23927956c14db40093d31bc80892764006a (patch) | |
tree | b96f7effa1b429e49e934713dc8ebcb1673f2f62 /core/src/main | |
parent | d98caa0fa0e502203289431502722b74ba367ddb (diff) | |
download | spark-50e2b23927956c14db40093d31bc80892764006a.tar.gz spark-50e2b23927956c14db40093d31bc80892764006a.tar.bz2 spark-50e2b23927956c14db40093d31bc80892764006a.zip |
Fix up some problems from the merge
Diffstat (limited to 'core/src/main')
3 files changed, 18 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index f4d026da33..c945c34c71 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -68,6 +68,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { case GetMemoryStatus => getMemoryStatus + case GetStorageStatus => + getStorageStatus + case RemoveBlock(blockId) => removeBlock(blockId) @@ -177,6 +180,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { sender ! res } + private def getStorageStatus() { + val res = blockManagerInfo.map { case(blockManagerId, info) => + import collection.JavaConverters._ + StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap) + } + sender ! res + } + private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index d73a9b790f..3a381fd385 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -100,3 +100,6 @@ case object GetMemoryStatus extends ToBlockManagerMaster private[spark] case object ExpireDeadHosts extends ToBlockManagerMaster + +private[spark] +case object GetStorageStatus extends ToBlockManagerMaster
\ No newline at end of file diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index ebc7390ee5..63ad5c125b 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -1,6 +1,7 @@ package spark.storage import spark.SparkContext +import BlockManagerMasterActor.BlockStatus private[spark] case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, @@ -20,8 +21,8 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } -case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long, locations: Array[BlockManagerId]) +case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, + numPartitions: Int, memSize: Long, diskSize: Long) /* Helper methods for storage-related objects */ @@ -58,8 +59,7 @@ object StorageUtils { val rddName = Option(sc.persistentRdds.get(rddId).name).getOrElse(rddKey) val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize, - rddBlocks.map(_.blockManagerId)) + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) }.toArray } |