diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-06-20 10:45:52 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-06-22 10:31:36 -0700 |
commit | ce81c320acfd265b8de7c30a46c6a3adc4f39a74 (patch) | |
tree | 99468ce2bbaaa052db15eff17b97f13b892cb224 /core | |
parent | 9fd5dc3ea9016b59667fd903992843605087e43b (diff) | |
download | spark-ce81c320acfd265b8de7c30a46c6a3adc4f39a74.tar.gz spark-ce81c320acfd265b8de7c30a46c6a3adc4f39a74.tar.bz2 spark-ce81c320acfd265b8de7c30a46c6a3adc4f39a74.zip |
Adding helper function to make listing tables
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/ui/BlockManagerUI.scala | 155 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/JobProgressUI.scala | 71 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/WebUI.scala | 12 |
3 files changed, 97 insertions, 141 deletions
diff --git a/core/src/main/scala/spark/ui/BlockManagerUI.scala b/core/src/main/scala/spark/ui/BlockManagerUI.scala index f319751590..3be5064837 100644 --- a/core/src/main/scala/spark/ui/BlockManagerUI.scala +++ b/core/src/main/scala/spark/ui/BlockManagerUI.scala @@ -1,14 +1,15 @@ package spark.ui -import akka.actor.{ActorRef, ActorSystem} import akka.util.Duration import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler -import spark.{Logging, SparkContext} -import spark.Utils -import WebUI._ +import spark.{RDD, Logging, SparkContext, Utils} +import spark.ui.WebUI._ import xml.Node import spark.storage.StorageUtils +import spark.storage.StorageStatus +import spark.storage.RDDInfo +import spark.storage.BlockManagerMasterActor.BlockStatus /** * Web UI server for the BlockManager inside each SparkContext. @@ -33,6 +34,14 @@ class BlockManagerUI(sc: SparkContext) filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage") + val workers = filteredStorageStatusList.map((prefix, _)) + val workerTable = listingTable(workerHeaders, workerRow, workers) + + val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk") + val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1) + val blockTable = listingTable(blockHeaders, blockRow, blocks) + val content = <div class="row"> <div class="span12"> @@ -64,67 +73,38 @@ class BlockManagerUI(sc: SparkContext) <div class="row"> <div class="span12"> <h3> RDD Summary </h3> - <br/> - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>Block Name</th> - <th>Storage Level</th> - <th>Size in Memory</th> - <th>Size on Disk</th> - </tr> - </thead> - <tbody> - {filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1).map { - case (k,v) => - <tr> - <td>{k}</td> - <td> - {v.storageLevel.description} - </td> - <td>{Utils.memoryBytesToString(v.memSize)}</td> - <td>{Utils.memoryBytesToString(v.diskSize)}</td> - </tr> - } - } - </tbody> - </table> + <br/> {blockTable} </div> </div> - <hr/> - <div class="row"> - <div class="span12"> - <h3> Worker Summary </h3> - <br/> - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>Host</th> - <th>Memory Usage</th> - <th>Disk Usage</th> - </tr> - </thead> - <tbody> - {filteredStorageStatusList.map { - status => - <tr> - <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> - <td> - {Utils.memoryBytesToString(status.memUsed(prefix))} - ({Utils.memoryBytesToString(status.memRemaining)} Total Available) - </td> - <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td> - </tr> - } - } - </tbody> - </table> - </div> - </div>; + <hr/> ++ {workerTable}; WebUI.headerSparkPage(content, "RDD Info: " + id) } + def blockRow(blk: (String, BlockStatus)): Seq[Node] = { + val (id, block) = blk + <tr> + <td>{id}</td> + <td> + {block.storageLevel.description} + </td> + <td>{Utils.memoryBytesToString(block.memSize)}</td> + <td>{Utils.memoryBytesToString(block.diskSize)}</td> + </tr> + } + + def workerRow(worker: (String, StorageStatus)): Seq[Node] = { + val (prefix, status) = worker + <tr> + <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> + <td> + {Utils.memoryBytesToString(status.memUsed(prefix))} + ({Utils.memoryBytesToString(status.memRemaining)} Total Available) + </td> + <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td> + </tr> + } + def indexPage: Seq[Node] = { val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics @@ -132,7 +112,16 @@ class BlockManagerUI(sc: SparkContext) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) .reduceOption(_+_).getOrElse(0L) + + val rddHeaders = Seq( + "RDD Name", + "Storage Level", + "Cached Partitions", + "Fraction Partitions Cached", + "Size in Memory", + "Size on Disk") val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + val rddTable = listingTable(rddHeaders, rddRow, rdds) val content = <div class="row"> @@ -144,38 +133,24 @@ class BlockManagerUI(sc: SparkContext) <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li> </ul> </div> - </div> - <hr/> - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>RDD Name</th> - <th>Storage Level</th> - <th>Cached Partitions</th> - <th>Fraction Partitions Cached</th> - <th>Size in Memory</th> - <th>Size on Disk</th> - </tr> - </thead> - <tbody> - {for (rdd <- rdds) yield - <tr> - <td> - <a href={"/storage/rdd?id=%s".format(rdd.id)}> - {rdd.name} - </a> - </td> - <td>{rdd.storageLevel.description} - </td> - <td>{rdd.numCachedPartitions}</td> - <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td> - <td>{Utils.memoryBytesToString(rdd.memSize)}</td> - <td>{Utils.memoryBytesToString(rdd.diskSize)}</td> - </tr> - } - </tbody> - </table>; + </div> ++ {rddTable}; WebUI.headerSparkPage(content, "Spark Storage ") } + + def rddRow(rdd: RDDInfo): Seq[Node] = { + <tr> + <td> + <a href={"/storage/rdd?id=%s".format(rdd.id)}> + {rdd.name} + </a> + </td> + <td>{rdd.storageLevel.description} + </td> + <td>{rdd.numCachedPartitions}</td> + <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td> + <td>{Utils.memoryBytesToString(rdd.memSize)}</td> + <td>{Utils.memoryBytesToString(rdd.diskSize)}</td> + </tr> + } } diff --git a/core/src/main/scala/spark/ui/JobProgressUI.scala b/core/src/main/scala/spark/ui/JobProgressUI.scala index 1f12df10b8..b51e62bb1a 100644 --- a/core/src/main/scala/spark/ui/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/JobProgressUI.scala @@ -39,6 +39,11 @@ class JobProgressUI(sc: SparkContext) extends UIComponent { def stagePage(request: HttpServletRequest): Seq[Node] = { val stageId = request.getParameter("id").toInt + + val taskHeaders = Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time") + val tasks = listener.stageToTaskInfos(stageId) + val taskTable = listingTable(taskHeaders, taskRow, tasks) + val content = <h2>Percentile Metrics</h2> <table class="table table-bordered table-striped table-condensed sortable"> @@ -53,27 +58,13 @@ class JobProgressUI(sc: SparkContext) extends UIComponent { {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }} </tbody> </table> - <h2>Tasks</h2> - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>Task ID</th> - <th>Service Time (ms)</th> - <th>Locality level</th> - <th>Worker</th> - <th>Launch Time</th> - </tr> - </thead> - <tbody> - {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }} - </tbody> - </table>; + <h2>Tasks</h2> ++ {taskTable}; + WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId)) } - - - def taskRow(info: TaskInfo, metrics: TaskMetrics): Seq[Node] = { + def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = { + val (info, metrics) = taskData <tr> <td>{info.taskId}</td> <td>{metrics.executorRunTime}</td> @@ -84,37 +75,16 @@ class JobProgressUI(sc: SparkContext) extends UIComponent { } def indexPage: Seq[Node] = { + val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total") + val activeStages = listener.activeStages.toSeq + val completedStages = listener.completedStages.toSeq + + val activeStageTable = listingTable(stageHeaders, stageRow, activeStages) + val completedStageTable = listingTable(stageHeaders, stageRow, completedStages) + val content = - <h2>Active Stages</h2> - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>Stage ID</th> - <th>Origin</th> - <th>Submitted</th> - <th>Duration</th> - <th>Tasks: Complete/Total</th> - </tr> - </thead> - <tbody> - {listener.activeStages.map(stageRow)} - </tbody> - </table> - <h2>Completed Stages</h2> - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>Stage ID</th> - <th>Origin</th> - <th>Submitted</th> - <th>Duration</th> - <th>Tasks: Complete/Total</th> - </tr> - </thead> - <tbody> - {listener.completedStages.map(stageRow)} - </tbody> - </table>; + <h2>Active Stages</h2> ++ {activeStageTable} + <h2>Completed Stages</h2> ++ {completedStageTable} WebUI.headerSparkPage(content, "Spark Stages") } @@ -139,7 +109,7 @@ class JobProgressUI(sc: SparkContext) extends UIComponent { <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions} {listener.stageToTasksFailed.getOrElse(s.id, 0) match { case f if f > 0 => "(%s failed)".format(f) - case _ => + case _ => }} </td> </tr> @@ -178,5 +148,4 @@ private[spark] class JobProgressListener extends SparkListener { } override def onJobEnd(jobEnd: SparkListenerEvents) { } -} - +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/ui/WebUI.scala b/core/src/main/scala/spark/ui/WebUI.scala index e80d48dd4a..265624a726 100644 --- a/core/src/main/scala/spark/ui/WebUI.scala +++ b/core/src/main/scala/spark/ui/WebUI.scala @@ -14,6 +14,7 @@ abstract class UIComponent { } object WebUI extends Logging { + // CORE WEB UI COMPONENTS type Responder[T] = HttpServletRequest => T implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler = @@ -80,6 +81,7 @@ object WebUI extends Logging { connect(port) } + // HELPER FUNCTIONS AND SHORTCUTS /** Page with Spark logo, title, and Spark UI headers */ def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = { val newContent = @@ -117,4 +119,14 @@ object WebUI extends Logging { </body> </html> } + + /** Shortcut for making a table derived from a sequence of objects. */ + def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead>{headers.map(h => "<th>%s</th>".format(h))}</thead> + <tbody> + {rows.map(r => makeRow(r))} + </tbody> + </table> + } } |