From 6855338e1400638188358a7d7926eb86f668c160 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 12 Jul 2013 19:24:16 -0700 Subject: Show block locations in Web UI. This fixes SPARK-769. Support is added for enumerating the locations of blocks in the UI. There is also some minor cleanup in StorageUtils. --- core/src/main/scala/spark/storage/StorageUtils.scala | 11 +++++++++-- core/src/main/scala/spark/ui/storage/RDDPage.scala | 17 +++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala index 950c0cdf35..3e7fa287e5 100644 --- a/core/src/main/scala/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/spark/storage/StorageUtils.scala @@ -39,12 +39,19 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel, private[spark] object StorageUtils { - /* Given the current storage status of the BlockManager, returns information for each RDD */ - def rddInfoFromStorageStatus(storageStatusList: Array[StorageStatus], + /* Returns RDD-level information, compiled from a list of StorageStatus objects */ + def rddInfoFromStorageStatus(storageStatusList: Seq[StorageStatus], sc: SparkContext) : Array[RDDInfo] = { rddInfoFromBlockStatusList(storageStatusList.flatMap(_.blocks).toMap, sc) } + /* Returns a map of blocks to their locations, compiled from a list of StorageStatus objects */ + def blockLocationsFromStorageStatus(storageStatusList: Seq[StorageStatus]) = { + val blockLocationPairs = storageStatusList + .flatMap(s => s.blocks.map(b => (b._1, s.blockManagerId.hostPort))) + blockLocationPairs.groupBy(_._1).map{case (k, v) => (k, v.unzip._2)}.toMap + } + /* Given a list of BlockStatus objets, returns information for each RDD */ def rddInfoFromBlockStatusList(infos: Map[String, BlockStatus], sc: SparkContext) : Array[RDDInfo] = { diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala index 0cb1e47ea5..428db6fa95 100644 --- a/core/src/main/scala/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala @@ -26,8 +26,14 @@ private[spark] class RDDPage(parent: BlockManagerUI) { val workers = filteredStorageStatusList.map((prefix, _)) val workerTable = listingTable(workerHeaders, workerRow, workers) - val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk") - val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1) + val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk", + "Locations") + + val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1) + val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList) + val blocks = blockStatuses.map { + case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN"))) + } val blockTable = listingTable(blockHeaders, blockRow, blocks) val content = @@ -74,8 +80,8 @@ private[spark] class RDDPage(parent: BlockManagerUI) { headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs) } - def blockRow(blk: (String, BlockStatus)): Seq[Node] = { - val (id, block) = blk + def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = { + val (id, block, locations) = row {id} @@ -87,6 +93,9 @@ private[spark] class RDDPage(parent: BlockManagerUI) { {Utils.memoryBytesToString(block.diskSize)} + + {locations.map(l => {l}
)} + } -- cgit v1.2.3