diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-02 23:44:13 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-02 23:44:13 -0800 |
commit | 88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e (patch) | |
tree | 67c2a8a58503dfa644a29a4e2f567a7408b4b40f | |
parent | cd4ca936798f327357eff1aa27c238dbce18cb8d (diff) | |
parent | c6190067ae40cf457b7f2e58619904b6fd2b1cb6 (diff) | |
download | spark-88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e.tar.gz spark-88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e.tar.bz2 spark-88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e.zip |
Merge pull request #422 from squito/blockmanager_info
RDDInfo available from SparkContext
6 files changed, 59 insertions, 38 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 2ed458c6fe..b95049da0c 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -46,6 +46,7 @@ import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, C import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import storage.BlockManagerUI import util.{MetadataCleaner, TimeStampedHashMap} +import storage.{StorageStatus, StorageUtils, RDDInfo} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -468,13 +469,28 @@ class SparkContext( * Return a map from the slave to the max memory available for caching and the remaining * memory available for caching. */ - def getSlavesMemoryStatus: Map[String, (Long, Long)] = { + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => (blockManagerId.ip + ":" + blockManagerId.port, mem) } } /** + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + def getRDDStorageInfo : Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) + } + + /** + * Return information about blocks stored in all of the slaves + */ + def getExecutorStorageStatus : Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. */ diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 7be6b9fa87..7389bee150 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -115,6 +115,10 @@ private[spark] class BlockManagerMaster( askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } + def getStorageStatus: Array[StorageStatus] = { + askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray + } + /** Stop the driver actor, called only on the Spark driver node */ def stop() { if (driverActor != null) { diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index eda320fa47..9e6721ec17 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -1,13 +1,10 @@ package spark.storage import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.ask import akka.util.Timeout import akka.util.duration._ -import cc.spray.directives._ import cc.spray.typeconversion.TwirlSupport._ import cc.spray.Directives -import scala.collection.mutable.ArrayBuffer import spark.{Logging, SparkContext} import spark.util.AkkaUtils import spark.Utils @@ -48,32 +45,26 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - // Calculate macro-level statistics - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_+_).getOrElse(0L) - val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - spark.storage.html.index. - render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) - } + val storageStatusList = sc.getExecutorStorageStatus + // Calculate macro-level statistics + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + spark.storage.html.index. + render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) } } ~ path("rdd") { parameter("id") { id => completeWith { - val future = blockManagerMaster ? GetStorageStatus - future.map { status => - val prefix = "rdd_" + id.toString - val storageStatusList = status.asInstanceOf[ArrayBuffer[StorageStatus]].toArray - val filteredStorageStatusList = StorageUtils. - filterStorageStatusByPrefix(storageStatusList, prefix) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) - } + val prefix = "rdd_" + id.toString + val storageStatusList = sc.getExecutorStorageStatus + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) } } } ~ diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index a10e3a95c6..5f72b67b2c 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -1,6 +1,6 @@ package spark.storage -import spark.SparkContext +import spark.{Utils, SparkContext} import BlockManagerMasterActor.BlockStatus private[spark] @@ -22,8 +22,13 @@ case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long, } case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, - numPartitions: Int, memSize: Long, diskSize: Long) - + numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long) { + override def toString = { + import Utils.memoryBytesToString + "RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id, + storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize)) + } +} /* Helper methods for storage-related objects */ private[spark] @@ -38,8 +43,6 @@ object StorageUtils { /* Given a list of BlockStatus objets, returns information for each RDD */ def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], sc: SparkContext) : Array[RDDInfo] = { - // Find all RDD Blocks (ignore broadcast variables) - val rddBlocks = infos.filterKeys(_.startsWith("rdd")) // Group by rddId, ignore the partition name val groupedRddBlocks = infos.groupBy { case(k, v) => @@ -56,10 +59,11 @@ object StorageUtils { // Find the id of the RDD, e.g. rdd_1 => 1 val rddId = rddKey.split("_").last.toInt // Get the friendly name for the rdd, if available. - val rddName = Option(sc.persistentRdds(rddId).name).getOrElse(rddKey) - val rddStorageLevel = sc.persistentRdds(rddId).getStorageLevel - - RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, memSize, diskSize) + val rdd = sc.persistentRdds(rddId) + val rddName = Option(rdd.name).getOrElse(rddKey) + val rddStorageLevel = rdd.getStorageLevel + + RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.splits.size, memSize, diskSize) }.toArray } @@ -75,4 +79,4 @@ object StorageUtils { } -}
\ No newline at end of file +} diff --git a/core/src/main/twirl/spark/storage/rdd.scala.html b/core/src/main/twirl/spark/storage/rdd.scala.html index ac7f8c981f..d85addeb17 100644 --- a/core/src/main/twirl/spark/storage/rdd.scala.html +++ b/core/src/main/twirl/spark/storage/rdd.scala.html @@ -11,7 +11,11 @@ <strong>Storage Level:</strong> @(rddInfo.storageLevel.description) <li> - <strong>Partitions:</strong> + <strong>Cached Partitions:</strong> + @(rddInfo.numCachedPartitions) + </li> + <li> + <strong>Total Partitions:</strong> @(rddInfo.numPartitions) </li> <li> diff --git a/core/src/main/twirl/spark/storage/rdd_table.scala.html b/core/src/main/twirl/spark/storage/rdd_table.scala.html index af801cf229..a51e64aed0 100644 --- a/core/src/main/twirl/spark/storage/rdd_table.scala.html +++ b/core/src/main/twirl/spark/storage/rdd_table.scala.html @@ -6,7 +6,8 @@ <tr> <th>RDD Name</th> <th>Storage Level</th> - <th>Partitions</th> + <th>Cached Partitions</th> + <th>Fraction Partitions Cached</th> <th>Size in Memory</th> <th>Size on Disk</th> </tr> @@ -21,7 +22,8 @@ </td> <td>@(rdd.storageLevel.description) </td> - <td>@rdd.numPartitions</td> + <td>@rdd.numCachedPartitions</td> + <td>@(rdd.numCachedPartitions / rdd.numPartitions.toDouble)</td> <td>@{Utils.memoryBytesToString(rdd.memSize)}</td> <td>@{Utils.memoryBytesToString(rdd.diskSize)}</td> </tr> |