From a423ee546c389b5ce0d2117299456712370d7ad1 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 22 Jan 2013 18:48:43 -0800 Subject: expose RDD & storage info directly via SparkContext --- core/src/main/scala/spark/SparkContext.scala | 16 +++++++++ .../scala/spark/storage/BlockManagerMaster.scala | 4 +++ .../main/scala/spark/storage/BlockManagerUI.scala | 39 +++++++++------------- .../main/scala/spark/storage/StorageUtils.scala | 10 +++--- 4 files changed, 41 insertions(+), 28 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 77036c1275..be992250a9 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -46,6 +46,7 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import storage.BlockManagerUI import util.{MetadataCleaner, TimeStampedHashMap} +import storage.{StorageStatus, StorageUtils, RDDInfo} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -473,6 +474,21 @@ class SparkContext( } } + /** + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + def getRDDStorageInfo : Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getSlavesStorageStatus, this) + } + + /** + * Return information about blocks stored in all of the slaves + */ + def getSlavesStorageStatus : Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 55ff1dde9c..c7ee76f0b7 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -118,6 +118,10 @@ private[spark] class BlockManagerMaster( askMasterWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } + def getStorageStatus: Array[StorageStatus] = { + askMasterWithRetry[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray + } + /** Stop the master actor, called only on the Spark master node */ def stop() { if (masterActor != null) { diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index eda320fa47..52f6d1b657 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -1,13 +1,10 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.ask import akka.util.Timeout import akka.util.duration._ -import cc.spray.directives._ import cc.spray.typeconversion.TwirlSupport._ import cc.spray.Directives -import scala.collection.mutable.ArrayBuffer import spark.{Logging, SparkContext} import spark.util.AkkaUtils import spark.Utils @@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - // Calculate macro-level statistics - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_+_).getOrElse(0L) - val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - spark.storage.html.index. - render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) - } + val storageStatusList = sc.getSlavesStorageStatus + // Calculate macro-level statistics + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + spark.storage.html.index. + render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) } } ~ path("rdd") { parameter("id") { id => completeWith { - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - val prefix = "rdd_" + id.toString - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val filteredStorageStatusList = StorageUtils. - filterStorageStatusByPrefix(storageStatusList, prefix) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) - } + val prefix = "rdd_" + id.toString + val storageStatusList = sc.getSlavesStorageStatus + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) } } } ~ diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index a10e3a95c6..d6e33c8619 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -56,9 +56,11 @@ object StorageUtils { // Find the id of the RDD, e.g. rdd_1 => 1 val rddId = rddKey.split("_").last.toInt // Get the friendly name for the rdd, if available. - val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey) - val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel - + val rdd = sc.persistentRdds(rddId) + val rddName = Option(rdd.name).getOrElse(rddKey) + val rddStorageLevel = rdd.getStorageLevel + //TODO get total number of partitions in rdd + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) }.toArray } @@ -75,4 +77,4 @@ object StorageUtils { } -} \ No newline at end of file +} -- cgit v1.2.3 From 0f22c4207f27bc8d1675af82f873141dda754f5c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 28 Jan 2013 10:08:59 -0800 Subject: better formatting for RDDInfo --- core/src/main/scala/spark/storage/StorageUtils.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index d6e33c8619..ce7c067eea 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -1,6 +1,6 @@ package spark.storage -import spark.SparkContext +import spark.{Utils, SparkContext} import BlockManagerMasterActor.BlockStatus private[spark] @@ -22,8 +22,14 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long) - + numPartitions: Int, memSize: Long, diskSize: Long) { + override def toString = { + import Utils.memoryBytesToString + import java.lang.{Integer => JInt} + String.format("RDD \"%s\" (%d) Storage: %s; Partitions: %d; MemorySize: %s; DiskSize: %s", name, id.asInstanceOf[JInt], + storageLevel.toString, numPartitions.asInstanceOf[JInt], memoryBytesToString(memSize), memoryBytesToString(diskSize)) + } +} /* Helper methods for storage-related objects */ private[spark] -- cgit v1.2.3 From c1df24d0850b0ac89f35f1a47ce6b2fb5b95df0a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 30 Jan 2013 18:51:14 -0800 Subject: rename Slaves --> Executor --- core/src/main/scala/spark/SparkContext.scala | 6 +++--- core/src/main/scala/spark/storage/BlockManagerUI.scala | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index a09eca1dd0..39e3555de8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -468,7 +468,7 @@ class SparkContext( * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. */ - def getSlavesMemoryStatus: Map[String, (Long, Long)] = { + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => (blockManagerId.ip + ":" + blockManagerId.port, mem) } @@ -479,13 +479,13 @@ class SparkContext( * they take, etc. */ def getRDDStorageInfo : Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getSlavesStorageStatus, this) + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) } /** * Return information about blocks stored in all of the slaves */ - def getSlavesStorageStatus : Array[StorageStatus] = { + def getExecutorStorageStatus : Array[StorageStatus] = { env.blockManager.master.getStorageStatus } diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 52f6d1b657..9e6721ec17 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val storageStatusList = sc.getSlavesStorageStatus + val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) @@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, parameter("id") { id => completeWith { val prefix = "rdd_" + id.toString - val storageStatusList = sc.getSlavesStorageStatus + val storageStatusList = sc.getExecutorStorageStatus val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head -- cgit v1.2.3 From f127f2ae76692b189d86b5a47293579d5657c6d5 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 1 Feb 2013 00:20:49 -0800 Subject: fixup merge (master -> driver renaming) --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 99324445ca..0372cb080a 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -118,7 +118,7 @@ private[spark] class BlockManagerMaster( } def getStorageStatus: Array[StorageStatus] = { - askMasterWithRetry[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray + askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray } /** Stop the driver actor, called only on the Spark driver node */ -- cgit v1.2.3 From 8a0a5ed53353ad6aa5656eb729d55ca7af2ab096 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 1 Feb 2013 00:23:38 -0800 Subject: track total partitions, in addition to cached partitions; use scala string formatting --- core/src/main/scala/spark/storage/StorageUtils.scala | 10 ++++------ core/src/main/twirl/spark/storage/rdd.scala.html | 6 +++++- core/src/main/twirl/spark/storage/rdd_table.scala.html | 6 ++++-- 3 files changed, 13 insertions(+), 9 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index ce7c067eea..5367b74bb6 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -22,12 +22,11 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long) { + numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) { override def toString = { import Utils.memoryBytesToString - import java.lang.{Integer => JInt} - String.format("RDD \"%s\" (%d) Storage: %s; Partitions: %d; MemorySize: %s; DiskSize: %s", name, id.asInstanceOf[JInt], - storageLevel.toString, numPartitions.asInstanceOf[JInt], memoryBytesToString(memSize), memoryBytesToString(diskSize)) + "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id, + storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize)) } } @@ -65,9 +64,8 @@ object StorageUtils { val rdd = sc.persistentRdds(rddId) val rddName = Option(rdd.name).getOrElse(rddKey) val rddStorageLevel = rdd.getStorageLevel - //TODO get total number of partitions in rdd - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.splits.size, memSize, diskSize) }.toArray } diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html index ac7f8c981f..d85addeb17 100644 --- a/core/src/main/twirl/spark/storage/rdd.scala.html +++ b/core/src/main/twirl/spark/storage/rdd.scala.html @@ -11,7 +11,11 @@ Storage Level: @(rddInfo.storageLevel.description)
  • - Partitions: + Cached Partitions: + @(rddInfo.numCachedPartitions) +
  • +
  • + Total Partitions: @(rddInfo.numPartitions)
  • diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html index af801cf229..a51e64aed0 100644 --- a/core/src/main/twirl/spark/storage/rdd_table.scala.html +++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html @@ -6,7 +6,8 @@ RDD Name Storage Level - Partitions + Cached Partitions + Fraction Partitions Cached Size in Memory Size on Disk @@ -21,7 +22,8 @@ @(rdd.storageLevel.description) - @rdd.numPartitions + @rdd.numCachedPartitions + @(rdd.numCachedPartitions / rdd.numPartitions.toDouble) @{Utils.memoryBytesToString(rdd.memSize)} @{Utils.memoryBytesToString(rdd.diskSize)} -- cgit v1.2.3 From c6190067ae40cf457b7f2e58619904b6fd2b1cb6 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 1 Feb 2013 09:55:25 -0800 Subject: remove unneeded (and unused) filter on block info --- core/src/main/scala/spark/storage/StorageUtils.scala | 2 -- 1 file changed, 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index 5367b74bb6..5f72b67b2c 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -43,8 +43,6 @@ object StorageUtils { /* Given a list of BlockStatus objets, returns information for each RDD */ def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], sc: SparkContext) : Array[RDDInfo] = { - // Find all RDD Blocks (ignore broadcast variables) - val rddBlocks = infos.filterKeys(_.startsWith("rdd")) // Group by rddId, ignore the partition name val groupedRddBlocks = infos.groupBy { case(k, v) => -- cgit v1.2.3