aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-20 10:45:52 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-22 10:31:36 -0700
commitce81c320acfd265b8de7c30a46c6a3adc4f39a74 (patch)
tree99468ce2bbaaa052db15eff17b97f13b892cb224 /core/src/main
parent9fd5dc3ea9016b59667fd903992843605087e43b (diff)
downloadspark-ce81c320acfd265b8de7c30a46c6a3adc4f39a74.tar.gz
spark-ce81c320acfd265b8de7c30a46c6a3adc4f39a74.tar.bz2
spark-ce81c320acfd265b8de7c30a46c6a3adc4f39a74.zip
Adding helper function to make listing tables
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/ui/BlockManagerUI.scala155
-rw-r--r--core/src/main/scala/spark/ui/JobProgressUI.scala71
-rw-r--r--core/src/main/scala/spark/ui/WebUI.scala12
3 files changed, 97 insertions, 141 deletions
diff --git a/core/src/main/scala/spark/ui/BlockManagerUI.scala b/core/src/main/scala/spark/ui/BlockManagerUI.scala
index f319751590..3be5064837 100644
--- a/core/src/main/scala/spark/ui/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/ui/BlockManagerUI.scala
@@ -1,14 +1,15 @@
package spark.ui
-import akka.actor.{ActorRef, ActorSystem}
import akka.util.Duration
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
-import spark.{Logging, SparkContext}
-import spark.Utils
-import WebUI._
+import spark.{RDD, Logging, SparkContext, Utils}
+import spark.ui.WebUI._
import xml.Node
import spark.storage.StorageUtils
+import spark.storage.StorageStatus
+import spark.storage.RDDInfo
+import spark.storage.BlockManagerMasterActor.BlockStatus
/**
* Web UI server for the BlockManager inside each SparkContext.
@@ -33,6 +34,14 @@ class BlockManagerUI(sc: SparkContext)
filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+ val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
+ val workers = filteredStorageStatusList.map((prefix, _))
+ val workerTable = listingTable(workerHeaders, workerRow, workers)
+
+ val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk")
+ val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+ val blockTable = listingTable(blockHeaders, blockRow, blocks)
+
val content =
<div class="row">
<div class="span12">
@@ -64,67 +73,38 @@ class BlockManagerUI(sc: SparkContext)
<div class="row">
<div class="span12">
<h3> RDD Summary </h3>
- <br/>
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>Block Name</th>
- <th>Storage Level</th>
- <th>Size in Memory</th>
- <th>Size on Disk</th>
- </tr>
- </thead>
- <tbody>
- {filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1).map {
- case (k,v) =>
- <tr>
- <td>{k}</td>
- <td>
- {v.storageLevel.description}
- </td>
- <td>{Utils.memoryBytesToString(v.memSize)}</td>
- <td>{Utils.memoryBytesToString(v.diskSize)}</td>
- </tr>
- }
- }
- </tbody>
- </table>
+ <br/> {blockTable}
</div>
</div>
- <hr/>
- <div class="row">
- <div class="span12">
- <h3> Worker Summary </h3>
- <br/>
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>Host</th>
- <th>Memory Usage</th>
- <th>Disk Usage</th>
- </tr>
- </thead>
- <tbody>
- {filteredStorageStatusList.map {
- status =>
- <tr>
- <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
- <td>
- {Utils.memoryBytesToString(status.memUsed(prefix))}
- ({Utils.memoryBytesToString(status.memRemaining)} Total Available)
- </td>
- <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
- </tr>
- }
- }
- </tbody>
- </table>
- </div>
- </div>;
+ <hr/> ++ {workerTable};
WebUI.headerSparkPage(content, "RDD Info: " + id)
}
+ def blockRow(blk: (String, BlockStatus)): Seq[Node] = {
+ val (id, block) = blk
+ <tr>
+ <td>{id}</td>
+ <td>
+ {block.storageLevel.description}
+ </td>
+ <td>{Utils.memoryBytesToString(block.memSize)}</td>
+ <td>{Utils.memoryBytesToString(block.diskSize)}</td>
+ </tr>
+ }
+
+ def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
+ val (prefix, status) = worker
+ <tr>
+ <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
+ <td>
+ {Utils.memoryBytesToString(status.memUsed(prefix))}
+ ({Utils.memoryBytesToString(status.memRemaining)} Total Available)
+ </td>
+ <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
+ </tr>
+ }
+
def indexPage: Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
@@ -132,7 +112,16 @@ class BlockManagerUI(sc: SparkContext)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
+
+ val rddHeaders = Seq(
+ "RDD Name",
+ "Storage Level",
+ "Cached Partitions",
+ "Fraction Partitions Cached",
+ "Size in Memory",
+ "Size on Disk")
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ val rddTable = listingTable(rddHeaders, rddRow, rdds)
val content =
<div class="row">
@@ -144,38 +133,24 @@ class BlockManagerUI(sc: SparkContext)
<li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
</ul>
</div>
- </div>
- <hr/>
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>RDD Name</th>
- <th>Storage Level</th>
- <th>Cached Partitions</th>
- <th>Fraction Partitions Cached</th>
- <th>Size in Memory</th>
- <th>Size on Disk</th>
- </tr>
- </thead>
- <tbody>
- {for (rdd <- rdds) yield
- <tr>
- <td>
- <a href={"/storage/rdd?id=%s".format(rdd.id)}>
- {rdd.name}
- </a>
- </td>
- <td>{rdd.storageLevel.description}
- </td>
- <td>{rdd.numCachedPartitions}</td>
- <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
- <td>{Utils.memoryBytesToString(rdd.memSize)}</td>
- <td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
- </tr>
- }
- </tbody>
- </table>;
+ </div> ++ {rddTable};
WebUI.headerSparkPage(content, "Spark Storage ")
}
+
+ def rddRow(rdd: RDDInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+ {rdd.name}
+ </a>
+ </td>
+ <td>{rdd.storageLevel.description}
+ </td>
+ <td>{rdd.numCachedPartitions}</td>
+ <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
+ <td>{Utils.memoryBytesToString(rdd.memSize)}</td>
+ <td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
+ </tr>
+ }
}
diff --git a/core/src/main/scala/spark/ui/JobProgressUI.scala b/core/src/main/scala/spark/ui/JobProgressUI.scala
index 1f12df10b8..b51e62bb1a 100644
--- a/core/src/main/scala/spark/ui/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/JobProgressUI.scala
@@ -39,6 +39,11 @@ class JobProgressUI(sc: SparkContext) extends UIComponent {
def stagePage(request: HttpServletRequest): Seq[Node] = {
val stageId = request.getParameter("id").toInt
+
+ val taskHeaders = Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time")
+ val tasks = listener.stageToTaskInfos(stageId)
+ val taskTable = listingTable(taskHeaders, taskRow, tasks)
+
val content =
<h2>Percentile Metrics</h2>
<table class="table table-bordered table-striped table-condensed sortable">
@@ -53,27 +58,13 @@ class JobProgressUI(sc: SparkContext) extends UIComponent {
{listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }}
</tbody>
</table>
- <h2>Tasks</h2>
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>Task ID</th>
- <th>Service Time (ms)</th>
- <th>Locality level</th>
- <th>Worker</th>
- <th>Launch Time</th>
- </tr>
- </thead>
- <tbody>
- {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }}
- </tbody>
- </table>;
+ <h2>Tasks</h2> ++ {taskTable};
+
WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId))
}
-
-
- def taskRow(info: TaskInfo, metrics: TaskMetrics): Seq[Node] = {
+ def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = {
+ val (info, metrics) = taskData
<tr>
<td>{info.taskId}</td>
<td>{metrics.executorRunTime}</td>
@@ -84,37 +75,16 @@ class JobProgressUI(sc: SparkContext) extends UIComponent {
}
def indexPage: Seq[Node] = {
+ val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total")
+ val activeStages = listener.activeStages.toSeq
+ val completedStages = listener.completedStages.toSeq
+
+ val activeStageTable = listingTable(stageHeaders, stageRow, activeStages)
+ val completedStageTable = listingTable(stageHeaders, stageRow, completedStages)
+
val content =
- <h2>Active Stages</h2>
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>Stage ID</th>
- <th>Origin</th>
- <th>Submitted</th>
- <th>Duration</th>
- <th>Tasks: Complete/Total</th>
- </tr>
- </thead>
- <tbody>
- {listener.activeStages.map(stageRow)}
- </tbody>
- </table>
- <h2>Completed Stages</h2>
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>Stage ID</th>
- <th>Origin</th>
- <th>Submitted</th>
- <th>Duration</th>
- <th>Tasks: Complete/Total</th>
- </tr>
- </thead>
- <tbody>
- {listener.completedStages.map(stageRow)}
- </tbody>
- </table>;
+ <h2>Active Stages</h2> ++ {activeStageTable}
+ <h2>Completed Stages</h2> ++ {completedStageTable}
WebUI.headerSparkPage(content, "Spark Stages")
}
@@ -139,7 +109,7 @@ class JobProgressUI(sc: SparkContext) extends UIComponent {
<td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
- case _ =>
+ case _ =>
}}
</td>
</tr>
@@ -178,5 +148,4 @@ private[spark] class JobProgressListener extends SparkListener {
}
override def onJobEnd(jobEnd: SparkListenerEvents) { }
-}
-
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/WebUI.scala b/core/src/main/scala/spark/ui/WebUI.scala
index e80d48dd4a..265624a726 100644
--- a/core/src/main/scala/spark/ui/WebUI.scala
+++ b/core/src/main/scala/spark/ui/WebUI.scala
@@ -14,6 +14,7 @@ abstract class UIComponent {
}
object WebUI extends Logging {
+ // CORE WEB UI COMPONENTS
type Responder[T] = HttpServletRequest => T
implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
@@ -80,6 +81,7 @@ object WebUI extends Logging {
connect(port)
}
+ // HELPER FUNCTIONS AND SHORTCUTS
/** Page with Spark logo, title, and Spark UI headers */
def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = {
val newContent =
@@ -117,4 +119,14 @@ object WebUI extends Logging {
</body>
</html>
}
+
+ /** Shortcut for making a table derived from a sequence of objects. */
+ def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>{headers.map(h => "<th>%s</th>".format(h))}</thead>
+ <tbody>
+ {rows.map(r => makeRow(r))}
+ </tbody>
+ </table>
+ }
}