From a423ee546c389b5ce0d2117299456712370d7ad1 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 22 Jan 2013 18:48:43 -0800 Subject: expose RDD & storage info directly via SparkContext --- core/src/main/scala/spark/SparkContext.scala | 16 +++++++++ .../scala/spark/storage/BlockManagerMaster.scala | 4 +++ .../main/scala/spark/storage/BlockManagerUI.scala | 39 +++++++++------------- .../main/scala/spark/storage/StorageUtils.scala | 10 +++--- 4 files changed, 41 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 77036c1275..be992250a9 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -46,6 +46,7 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import storage.BlockManagerUI import util.{MetadataCleaner, TimeStampedHashMap} +import storage.{StorageStatus, StorageUtils, RDDInfo} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -473,6 +474,21 @@ class SparkContext( } } + /** + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + def getRDDStorageInfo : Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getSlavesStorageStatus, this) + } + + /** + * Return information about blocks stored in all of the slaves + */ + def getSlavesStorageStatus : Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 55ff1dde9c..c7ee76f0b7 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -118,6 +118,10 @@ private[spark] class BlockManagerMaster( askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } + def getStorageStatus: Array[StorageStatus] = { + askMasterWithRetry[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray + } + /** Stop the master actor, called only on the Spark master node */ def stop() { if (masterActor != null) { diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index eda320fa47..52f6d1b657 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -1,13 +1,10 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.ask import akka.util.Timeout import akka.util.duration._ -import cc.spray.directives._ import cc.spray.typeconversion.TwirlSupport._ import cc.spray.Directives -import scala.collection.mutable.ArrayBuffer import spark.{Logging, SparkContext} import spark.util.AkkaUtils import spark.Utils @@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - // Calculate macro-level statistics - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_+_).getOrElse(0L) - val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - spark.storage.html.index. - render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) - } + val storageStatusList = sc.getSlavesStorageStatus + // Calculate macro-level statistics + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + spark.storage.html.index. + render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) } } ~ path("rdd") { parameter("id") { id => completeWith { - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - val prefix = "rdd_" + id.toString - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val filteredStorageStatusList = StorageUtils. - filterStorageStatusByPrefix(storageStatusList, prefix) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) - } + val prefix = "rdd_" + id.toString + val storageStatusList = sc.getSlavesStorageStatus + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) } } } ~ diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index a10e3a95c6..d6e33c8619 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -56,9 +56,11 @@ object StorageUtils { // Find the id of the RDD, e.g. rdd_1 => 1 val rddId = rddKey.split("_").last.toInt // Get the friendly name for the rdd, if available. - val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey) - val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel - + val rdd = sc.persistentRdds(rddId) + val rddName = Option(rdd.name).getOrElse(rddKey) + val rddStorageLevel = rdd.getStorageLevel + //TODO get total number of partitions in rdd + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) }.toArray } @@ -75,4 +77,4 @@ object StorageUtils { } -} \ No newline at end of file +} -- cgit v1.2.3