aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-20 12:05:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-22 10:31:36 -0700
commitdcf6a68177912ffa69ee75345c8172fd2db9e0ea (patch)
tree2d6892d6f709a3f65616db3e7b41bd6070c0e74e
parentce81c320acfd265b8de7c30a46c6a3adc4f39a74 (diff)
downloadspark-dcf6a68177912ffa69ee75345c8172fd2db9e0ea.tar.gz
spark-dcf6a68177912ffa69ee75345c8172fd2db9e0ea.tar.bz2
spark-dcf6a68177912ffa69ee75345c8172fd2db9e0ea.zip
Refactoring into different modules
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/ui/BlockManagerUI.scala156
-rw-r--r--core/src/main/scala/spark/ui/JobProgressUI.scala151
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala2
-rw-r--r--core/src/main/scala/spark/ui/WebUI.scala7
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala55
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala82
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala63
-rw-r--r--core/src/main/scala/spark/ui/storage/BlockManagerUI.scala26
-rw-r--r--core/src/main/scala/spark/ui/storage/IndexPage.scala61
-rw-r--r--core/src/main/scala/spark/ui/storage/RDDPage.scala94
11 files changed, 390 insertions, 309 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 155a5ee721..901cda4174 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -48,7 +48,7 @@ import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import ui.{SparkUI, BlockManagerUI}
+import ui.{SparkUI}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
diff --git a/core/src/main/scala/spark/ui/BlockManagerUI.scala b/core/src/main/scala/spark/ui/BlockManagerUI.scala
deleted file mode 100644
index 3be5064837..0000000000
--- a/core/src/main/scala/spark/ui/BlockManagerUI.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-package spark.ui
-
-import akka.util.Duration
-import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.Handler
-import spark.{RDD, Logging, SparkContext, Utils}
-import spark.ui.WebUI._
-import xml.Node
-import spark.storage.StorageUtils
-import spark.storage.StorageStatus
-import spark.storage.RDDInfo
-import spark.storage.BlockManagerMasterActor.BlockStatus
-
-/**
- * Web UI server for the BlockManager inside each SparkContext.
- */
-private[spark]
-class BlockManagerUI(sc: SparkContext)
- extends UIComponent with Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
-
- def getHandlers = Seq[(String, Handler)](
- ("/storage/rdd", (request: HttpServletRequest) => rddPage(request)),
- ("/storage", (request: HttpServletRequest) => indexPage)
- )
-
- def rddPage(request: HttpServletRequest): Seq[Node] = {
- val id = request.getParameter("id")
- val prefix = "rdd_" + id.toString
- val storageStatusList = sc.getExecutorStorageStatus
- val filteredStorageStatusList = StorageUtils.
- filterStorageStatusByPrefix(storageStatusList, prefix)
- val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
-
- val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
- val workers = filteredStorageStatusList.map((prefix, _))
- val workerTable = listingTable(workerHeaders, workerRow, workers)
-
- val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk")
- val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
- val blockTable = listingTable(blockHeaders, blockRow, blocks)
-
- val content =
- <div class="row">
- <div class="span12">
- <ul class="unstyled">
- <li>
- <strong>Storage Level:</strong>
- {rddInfo.storageLevel.description}
- </li>
- <li>
- <strong>Cached Partitions:</strong>
- {rddInfo.numCachedPartitions}
- </li>
- <li>
- <strong>Total Partitions:</strong>
- {rddInfo.numPartitions}
- </li>
- <li>
- <strong>Memory Size:</strong>
- {Utils.memoryBytesToString(rddInfo.memSize)}
- </li>
- <li>
- <strong>Disk Size:</strong>
- {Utils.memoryBytesToString(rddInfo.diskSize)}
- </li>
- </ul>
- </div>
- </div>
- <hr/>
- <div class="row">
- <div class="span12">
- <h3> RDD Summary </h3>
- <br/> {blockTable}
- </div>
- </div>
- <hr/> ++ {workerTable};
-
- WebUI.headerSparkPage(content, "RDD Info: " + id)
- }
-
- def blockRow(blk: (String, BlockStatus)): Seq[Node] = {
- val (id, block) = blk
- <tr>
- <td>{id}</td>
- <td>
- {block.storageLevel.description}
- </td>
- <td>{Utils.memoryBytesToString(block.memSize)}</td>
- <td>{Utils.memoryBytesToString(block.diskSize)}</td>
- </tr>
- }
-
- def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
- val (prefix, status) = worker
- <tr>
- <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
- <td>
- {Utils.memoryBytesToString(status.memUsed(prefix))}
- ({Utils.memoryBytesToString(status.memRemaining)} Total Available)
- </td>
- <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
- </tr>
- }
-
- def indexPage: Seq[Node] = {
- val storageStatusList = sc.getExecutorStorageStatus
- // Calculate macro-level statistics
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
-
- val rddHeaders = Seq(
- "RDD Name",
- "Storage Level",
- "Cached Partitions",
- "Fraction Partitions Cached",
- "Size in Memory",
- "Size on Disk")
- val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
- val rddTable = listingTable(rddHeaders, rddRow, rdds)
-
- val content =
- <div class="row">
- <div class="span12">
- <ul class="unstyled">
- <li><strong>Memory:</strong>
- {Utils.memoryBytesToString(maxMem - remainingMem)} Used
- ({Utils.memoryBytesToString(remainingMem)} Available) </li>
- <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
- </ul>
- </div>
- </div> ++ {rddTable};
-
- WebUI.headerSparkPage(content, "Spark Storage ")
- }
-
- def rddRow(rdd: RDDInfo): Seq[Node] = {
- <tr>
- <td>
- <a href={"/storage/rdd?id=%s".format(rdd.id)}>
- {rdd.name}
- </a>
- </td>
- <td>{rdd.storageLevel.description}
- </td>
- <td>{rdd.numCachedPartitions}</td>
- <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
- <td>{Utils.memoryBytesToString(rdd.memSize)}</td>
- <td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
- </tr>
- }
-}
diff --git a/core/src/main/scala/spark/ui/JobProgressUI.scala b/core/src/main/scala/spark/ui/JobProgressUI.scala
deleted file mode 100644
index b51e62bb1a..0000000000
--- a/core/src/main/scala/spark/ui/JobProgressUI.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-package spark.ui
-
-import spark.{Utils, SparkContext}
-import spark.scheduler._
-import spark.scheduler.SparkListenerTaskEnd
-import spark.scheduler.StageCompleted
-import spark.scheduler.SparkListenerStageSubmitted
-import org.eclipse.jetty.server.Handler
-import javax.servlet.http.HttpServletRequest
-import xml.Node
-import WebUI._
-import collection.mutable._
-import spark.Success
-import akka.util.Duration
-import java.text.SimpleDateFormat
-import java.util.Date
-import spark.scheduler.cluster.TaskInfo
-import collection.mutable
-import org.hsqldb.lib.HashMappedList
-import spark.executor.TaskMetrics
-import spark.scheduler.SparkListenerTaskEnd
-import scala.Some
-import spark.scheduler.SparkListenerStageSubmitted
-import scala.Seq
-import spark.scheduler.StageCompleted
-import spark.scheduler.SparkListenerJobStart
-
-private[spark]
-class JobProgressUI(sc: SparkContext) extends UIComponent {
- val listener = new JobProgressListener
- val fmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss")
-
- sc.addSparkListener(listener)
-
- def getHandlers = Seq[(String, Handler)](
- ("/stages/stage", (request: HttpServletRequest) => stagePage(request)),
- ("/stages", (request: HttpServletRequest) => indexPage)
- )
-
- def stagePage(request: HttpServletRequest): Seq[Node] = {
- val stageId = request.getParameter("id").toInt
-
- val taskHeaders = Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time")
- val tasks = listener.stageToTaskInfos(stageId)
- val taskTable = listingTable(taskHeaders, taskRow, tasks)
-
- val content =
- <h2>Percentile Metrics</h2>
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>Service Time</th>
- <th>Remote Bytes Read</th>
- <th>Shuffle Bytes Written</th>
- </tr>
- </thead>
- <tbody>
- {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }}
- </tbody>
- </table>
- <h2>Tasks</h2> ++ {taskTable};
-
- WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId))
- }
-
- def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = {
- val (info, metrics) = taskData
- <tr>
- <td>{info.taskId}</td>
- <td>{metrics.executorRunTime}</td>
- <td>{info.taskLocality}</td>
- <td>{info.hostPort}</td>
- <td>{fmt.format(new Date(info.launchTime))}</td>
- </tr>
- }
-
- def indexPage: Seq[Node] = {
- val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total")
- val activeStages = listener.activeStages.toSeq
- val completedStages = listener.completedStages.toSeq
-
- val activeStageTable = listingTable(stageHeaders, stageRow, activeStages)
- val completedStageTable = listingTable(stageHeaders, stageRow, completedStages)
-
- val content =
- <h2>Active Stages</h2> ++ {activeStageTable}
- <h2>Completed Stages</h2> ++ {completedStageTable}
-
- WebUI.headerSparkPage(content, "Spark Stages")
- }
-
- def getElapsedTime(submitted: Option[Long], completed: Long): String = {
- submitted match {
- case Some(t) => Duration(completed - t, "milliseconds").printHMS
- case _ => "Unknown"
- }
- }
-
- def stageRow(s: Stage): Seq[Node] = {
- val submissionTime = s.submissionTime match {
- case Some(t) => fmt.format(new Date(t))
- case None => "Unknown"
- }
- <tr>
- <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>
- <td>{s.origin}</td>
- <td>{submissionTime}</td>
- <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
- <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions}
- {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
- case f if f > 0 => "(%s failed)".format(f)
- case _ =>
- }}
- </td>
- </tr>
- }
-}
-
-private[spark] class JobProgressListener extends SparkListener {
- val activeStages = HashSet[Stage]()
- val stageToTasksComplete = HashMap[Int, Int]()
- val stageToTasksFailed = HashMap[Int, Int]()
- val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]()
- val completedStages = ListBuffer[Stage]() // Todo (pwendell): Evict these over time
-
- override def onJobStart(jobStart: SparkListenerJobStart) { }
-
- override def onStageCompleted(stageCompleted: StageCompleted) = {
- val stage = stageCompleted.stageInfo.stage
- activeStages -= stage
- stage +=: completedStages
- }
-
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
- activeStages += stageSubmitted.stage
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val sid = taskEnd.event.task.stageId
- taskEnd.event.reason match {
- case Success =>
- stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- case _ =>
- stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- }
- val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]())
- taskList += ((taskEnd.event.taskInfo, taskEnd.event.taskMetrics))
- stageToTaskInfos(sid) = taskList
- }
-
- override def onJobEnd(jobEnd: SparkListenerEvents) { }
-} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 63b75df36f..a1f6cc60ec 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -1,8 +1,10 @@
package spark.ui
+import jobs.JobProgressUI
import spark.{Logging, SparkContext, Utils}
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
+import storage.BlockManagerUI
import WebUI._
private[spark] class SparkUI(sc: SparkContext) extends Logging {
diff --git a/core/src/main/scala/spark/ui/WebUI.scala b/core/src/main/scala/spark/ui/WebUI.scala
index 265624a726..6dedd28fc1 100644
--- a/core/src/main/scala/spark/ui/WebUI.scala
+++ b/core/src/main/scala/spark/ui/WebUI.scala
@@ -13,6 +13,10 @@ abstract class UIComponent {
def getHandlers(): Seq[(String, Handler)]
}
+abstract class View[T] {
+ def render(request: HttpServletRequest): T
+}
+
object WebUI extends Logging {
// CORE WEB UI COMPONENTS
type Responder[T] = HttpServletRequest => T
@@ -82,6 +86,7 @@ object WebUI extends Logging {
}
// HELPER FUNCTIONS AND SHORTCUTS
+
/** Page with Spark logo, title, and Spark UI headers */
def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = {
val newContent =
@@ -123,7 +128,7 @@ object WebUI extends Logging {
/** Shortcut for making a table derived from a sequence of objects. */
def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
- <thead>{headers.map(h => "<th>%s</th>".format(h))}</thead>
+ <thead>{headers.map(h => <th>{h}</th>)}</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
new file mode 100644
index 0000000000..1740524f49
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -0,0 +1,55 @@
+package spark.ui.jobs
+
+import spark.ui.{WebUI, View}
+import xml.{NodeSeq, Node}
+import spark.ui.WebUI._
+import scala.Some
+import akka.util.Duration
+import spark.scheduler.Stage
+import java.util.Date
+import javax.servlet.http.HttpServletRequest
+
+class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] {
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total")
+ val activeStages = listener.activeStages.toSeq
+ val completedStages = listener.completedStages.toSeq
+
+ val activeStageTable: NodeSeq = listingTable(stageHeaders, stageRow, activeStages)
+ val completedStageTable = listingTable(stageHeaders, stageRow, completedStages)
+
+ val content = <h2>Active Stages</h2> ++ activeStageTable ++
+ <h2>Completed Stages</h2> ++ completedStageTable
+
+ WebUI.headerSparkPage(content, "Spark Stages")
+ }
+
+ def getElapsedTime(submitted: Option[Long], completed: Long): String = {
+ submitted match {
+ case Some(t) => Duration(completed - t, "milliseconds").printHMS
+ case _ => "Unknown"
+ }
+ }
+
+ def stageRow(s: Stage): Seq[Node] = {
+ val submissionTime = s.submissionTime match {
+ case Some(t) => dateFmt.format(new Date(t))
+ case None => "Unknown"
+ }
+ <tr>
+ <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>
+ <td>{s.origin}</td>
+ <td>{submissionTime}</td>
+ <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+ <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions}
+ {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
+ case f if f > 0 => "(%s failed)".format(f)
+ case _ =>
+ }}
+ </td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
new file mode 100644
index 0000000000..9ee962bf2e
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -0,0 +1,82 @@
+package spark.ui.jobs
+
+import spark.{Utils, SparkContext}
+import spark.scheduler._
+import spark.scheduler.SparkListenerTaskEnd
+import spark.scheduler.StageCompleted
+import spark.scheduler.SparkListenerStageSubmitted
+import org.eclipse.jetty.server.Handler
+import javax.servlet.http.HttpServletRequest
+import xml.Node
+import collection.mutable._
+import spark.Success
+import akka.util.Duration
+import java.text.SimpleDateFormat
+import java.util.Date
+import spark.scheduler.cluster.TaskInfo
+import collection.mutable
+import org.hsqldb.lib.HashMappedList
+import spark.executor.TaskMetrics
+import spark.scheduler.SparkListenerTaskEnd
+import scala.Some
+import spark.scheduler.SparkListenerStageSubmitted
+import scala.Seq
+import spark.scheduler.StageCompleted
+import spark.scheduler.SparkListenerJobStart
+import spark.ui.{WebUI, UIComponent}
+import spark.ui.WebUI._
+import spark.scheduler.SparkListenerTaskEnd
+import scala.Some
+import spark.scheduler.SparkListenerStageSubmitted
+import spark.scheduler.StageCompleted
+import spark.scheduler.SparkListenerJobStart
+
+private[spark]
+class JobProgressUI(sc: SparkContext) extends UIComponent {
+ val listener = new JobProgressListener
+ val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss")
+
+ sc.addSparkListener(listener)
+
+ val indexPage = new IndexPage(this)
+ val stagePage = new StagePage(this)
+
+ def getHandlers = Seq[(String, Handler)](
+ ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
+ ("/stages", (request: HttpServletRequest) => indexPage.render(request))
+ )
+}
+
+private[spark] class JobProgressListener extends SparkListener {
+ val activeStages = HashSet[Stage]()
+ val stageToTasksComplete = HashMap[Int, Int]()
+ val stageToTasksFailed = HashMap[Int, Int]()
+ val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]()
+ val completedStages = ListBuffer[Stage]() // Todo (pwendell): Evict these over time
+
+ override def onJobStart(jobStart: SparkListenerJobStart) { }
+
+ override def onStageCompleted(stageCompleted: StageCompleted) = {
+ val stage = stageCompleted.stageInfo.stage
+ activeStages -= stage
+ stage +=: completedStages
+ }
+
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
+ activeStages += stageSubmitted.stage
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val sid = taskEnd.event.task.stageId
+ taskEnd.event.reason match {
+ case Success =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ case _ =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ }
+ val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]())
+ taskList += ((taskEnd.event.taskInfo, taskEnd.event.taskMetrics))
+ stageToTaskInfos(sid) = taskList
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerEvents) { }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
new file mode 100644
index 0000000000..49f5b1c73c
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -0,0 +1,63 @@
+package spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+import xml.Node
+import spark.ui.WebUI._
+import spark.ui.WebUI
+import spark.ui.View
+import spark.util.Distribution
+import spark.scheduler.cluster.TaskInfo
+import spark.executor.TaskMetrics
+import java.util.Date
+
+class StagePage(parent: JobProgressUI) extends View[Seq[Node]] {
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stageId = request.getParameter("id").toInt
+
+ val taskHeaders = Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time")
+ val tasks = listener.stageToTaskInfos(stageId)
+ val taskTable = listingTable(taskHeaders, taskRow, tasks)
+
+ val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble}
+ val serviceQuantiles = "Service Time " ++ Distribution(serviceTimes).get.getQuantiles().map(_.toString)
+ val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
+
+ val quantileTable = listingTable(quantileHeaders, Seq(serviceQuantiles), quantileRow)
+
+ val content =
+ <h2>Percentile Metrics</h2>
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>Service Time</th>
+ <th>Remote Bytes Read</th>
+ <th>Shuffle Bytes Written</th>
+ </tr>
+ </thead>
+ <tbody>
+ {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }}
+ </tbody>
+ </table>
+ <h2>Tasks</h2> ++ {taskTable};
+
+ WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId))
+ }
+
+ def quantileRow(data: Seq[String]) = <tr> {data.map(d => <td>d</td>)} </tr>
+
+ def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = {
+ val (info, metrics) = taskData
+ <tr>
+ <td>{info.taskId}</td>
+ <td>{metrics.executorRunTime}</td>
+ <td>{info.taskLocality}</td>
+ <td>{info.hostPort}</td>
+ <td>{dateFmt.format(new Date(info.launchTime))}</td>
+ </tr>
+ }
+
+
+}
diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
new file mode 100644
index 0000000000..21555384dd
--- /dev/null
+++ b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
@@ -0,0 +1,26 @@
+package spark.ui.storage
+
+import akka.util.Duration
+import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.Handler
+import spark.{Logging, SparkContext}
+import spark.ui.WebUI._
+import spark.ui.{UIComponent}
+
+/**
+ * Web UI server for the BlockManager inside each SparkContext.
+ */
+private[spark]
+class BlockManagerUI(val sc: SparkContext)
+ extends UIComponent with Logging {
+ implicit val timeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+
+ val indexPage = new IndexPage(this)
+ val rddPage = new RDDPage(this)
+
+ def getHandlers = Seq[(String, Handler)](
+ ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)),
+ ("/storage", (request: HttpServletRequest) => indexPage.render(request))
+ )
+}
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
new file mode 100644
index 0000000000..2f4857d48a
--- /dev/null
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -0,0 +1,61 @@
+package spark.ui.storage
+
+import xml.Node
+import spark.storage.{RDDInfo, StorageUtils}
+import spark.ui.WebUI._
+import spark.Utils
+import spark.ui.{View, WebUI}
+import javax.servlet.http.HttpServletRequest
+
+class IndexPage(parent: BlockManagerUI) extends View[Seq[Node]] {
+ val sc = parent.sc
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val storageStatusList = sc.getExecutorStorageStatus
+ // Calculate macro-level statistics
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+
+ val rddHeaders = Seq(
+ "RDD Name",
+ "Storage Level",
+ "Cached Partitions",
+ "Fraction Partitions Cached",
+ "Size in Memory",
+ "Size on Disk")
+ val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
+ val rddTable = listingTable(rddHeaders, rddRow, rdds)
+
+ val content =
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>Memory:</strong>
+ {Utils.memoryBytesToString(maxMem - remainingMem)} Used
+ ({Utils.memoryBytesToString(remainingMem)} Available) </li>
+ <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
+ </ul>
+ </div>
+ </div> ++ {rddTable};
+
+ WebUI.headerSparkPage(content, "Spark Storage ")
+ }
+
+ def rddRow(rdd: RDDInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+ {rdd.name}
+ </a>
+ </td>
+ <td>{rdd.storageLevel.description}
+ </td>
+ <td>{rdd.numCachedPartitions}</td>
+ <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
+ <td>{Utils.memoryBytesToString(rdd.memSize)}</td>
+ <td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
new file mode 100644
index 0000000000..957c400080
--- /dev/null
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -0,0 +1,94 @@
+package spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+import xml.Node
+import spark.storage.{StorageStatus, StorageUtils}
+import spark.ui.WebUI._
+import spark.Utils
+import spark.ui.WebUI
+import spark.ui.View
+import spark.storage.BlockManagerMasterActor.BlockStatus
+
+
+class RDDPage(parent: BlockManagerUI) extends View[Seq[Node]] {
+ val sc = parent.sc
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val id = request.getParameter("id")
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = sc.getExecutorStorageStatus
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+
+ val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
+ val workers = filteredStorageStatusList.map((prefix, _))
+ val workerTable = listingTable(workerHeaders, workerRow, workers)
+
+ val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk")
+ val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+ val blockTable = listingTable(blockHeaders, blockRow, blocks)
+
+ val content =
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li>
+ <strong>Storage Level:</strong>
+ {rddInfo.storageLevel.description}
+ </li>
+ <li>
+ <strong>Cached Partitions:</strong>
+ {rddInfo.numCachedPartitions}
+ </li>
+ <li>
+ <strong>Total Partitions:</strong>
+ {rddInfo.numPartitions}
+ </li>
+ <li>
+ <strong>Memory Size:</strong>
+ {Utils.memoryBytesToString(rddInfo.memSize)}
+ </li>
+ <li>
+ <strong>Disk Size:</strong>
+ {Utils.memoryBytesToString(rddInfo.diskSize)}
+ </li>
+ </ul>
+ </div>
+ </div>
+ <hr/>
+ <div class="row">
+ <div class="span12">
+ <h3> RDD Summary </h3>
+ <br/> {blockTable}
+ </div>
+ </div>
+ <hr/> ++ {workerTable};
+
+ WebUI.headerSparkPage(content, "RDD Info: " + id)
+ }
+
+ def blockRow(blk: (String, BlockStatus)): Seq[Node] = {
+ val (id, block) = blk
+ <tr>
+ <td>{id}</td>
+ <td>
+ {block.storageLevel.description}
+ </td>
+ <td>{Utils.memoryBytesToString(block.memSize)}</td>
+ <td>{Utils.memoryBytesToString(block.diskSize)}</td>
+ </tr>
+ }
+
+ def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
+ val (prefix, status) = worker
+ <tr>
+ <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
+ <td>
+ {Utils.memoryBytesToString(status.memUsed(prefix))}
+ ({Utils.memoryBytesToString(status.memRemaining)} Total Available)
+ </td>
+ <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
+ </tr>
+ }
+}