aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-01-22 09:27:33 -0800
committerImran Rashid <imran@quantifind.com>2013-01-22 11:46:01 -0800
commit50e2b23927956c14db40093d31bc80892764006a (patch)
treeb96f7effa1b429e49e934713dc8ebcb1673f2f62 /core/src/main
parentd98caa0fa0e502203289431502722b74ba367ddb (diff)
downloadspark-50e2b23927956c14db40093d31bc80892764006a.tar.gz
spark-50e2b23927956c14db40093d31bc80892764006a.tar.bz2
spark-50e2b23927956c14db40093d31bc80892764006a.zip
Fix up some problems from the merge
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala11
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala3
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala8
3 files changed, 18 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index f4d026da33..c945c34c71 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -68,6 +68,9 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
case GetMemoryStatus =>
getMemoryStatus
+ case GetStorageStatus =>
+ getStorageStatus
+
case RemoveBlock(blockId) =>
removeBlock(blockId)
@@ -177,6 +180,14 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! res
}
+ private def getStorageStatus() {
+ val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ import collection.JavaConverters._
+ StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
+ }
+ sender ! res
+ }
+
private def register(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index d73a9b790f..3a381fd385 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -100,3 +100,6 @@ case object GetMemoryStatus extends ToBlockManagerMaster
private[spark]
case object ExpireDeadHosts extends ToBlockManagerMaster
+
+private[spark]
+case object GetStorageStatus extends ToBlockManagerMaster \ No newline at end of file
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index ebc7390ee5..63ad5c125b 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -1,6 +1,7 @@
package spark.storage
import spark.SparkContext
+import BlockManagerMasterActor.BlockStatus
private[spark]
case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
@@ -20,8 +21,8 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,
}
-case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
- numPartitions: Int, memSize: Long, diskSize: Long, locations: Array[BlockManagerId])
+case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
+ numPartitions: Int, memSize: Long, diskSize: Long)
/* Helper methods for storage-related objects */
@@ -58,8 +59,7 @@ object StorageUtils {
val rddName = Option(sc.persistentRdds.get(rddId).name).getOrElse(rddKey)
val rddStorageLevel = sc.persistentRdds.get(rddId).getStorageLevel
- RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize,
- rddBlocks.map(_.blockManagerId))
+ RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize)
}.toArray
}