diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-06-20 12:05:49 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-06-22 10:31:36 -0700 |
commit | dcf6a68177912ffa69ee75345c8172fd2db9e0ea (patch) | |
tree | 2d6892d6f709a3f65616db3e7b41bd6070c0e74e | |
parent | ce81c320acfd265b8de7c30a46c6a3adc4f39a74 (diff) | |
download | spark-dcf6a68177912ffa69ee75345c8172fd2db9e0ea.tar.gz spark-dcf6a68177912ffa69ee75345c8172fd2db9e0ea.tar.bz2 spark-dcf6a68177912ffa69ee75345c8172fd2db9e0ea.zip |
Refactoring into different modules
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/BlockManagerUI.scala | 156 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/JobProgressUI.scala | 151 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/SparkUI.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/WebUI.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/IndexPage.scala | 55 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressUI.scala | 82 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 63 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/storage/BlockManagerUI.scala | 26 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/storage/IndexPage.scala | 61 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/storage/RDDPage.scala | 94 |
11 files changed, 390 insertions, 309 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 155a5ee721..901cda4174 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -48,7 +48,7 @@ import spark.scheduler.local.LocalScheduler import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.{StorageStatus, StorageUtils, RDDInfo} import spark.util.{MetadataCleaner, TimeStampedHashMap} -import ui.{SparkUI, BlockManagerUI} +import ui.{SparkUI} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark diff --git a/core/src/main/scala/spark/ui/BlockManagerUI.scala b/core/src/main/scala/spark/ui/BlockManagerUI.scala deleted file mode 100644 index 3be5064837..0000000000 --- a/core/src/main/scala/spark/ui/BlockManagerUI.scala +++ /dev/null @@ -1,156 +0,0 @@ -package spark.ui - -import akka.util.Duration -import javax.servlet.http.HttpServletRequest -import org.eclipse.jetty.server.Handler -import spark.{RDD, Logging, SparkContext, Utils} -import spark.ui.WebUI._ -import xml.Node -import spark.storage.StorageUtils -import spark.storage.StorageStatus -import spark.storage.RDDInfo -import spark.storage.BlockManagerMasterActor.BlockStatus - -/** - * Web UI server for the BlockManager inside each SparkContext. - */ -private[spark] -class BlockManagerUI(sc: SparkContext) - extends UIComponent with Logging { - implicit val timeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - - - def getHandlers = Seq[(String, Handler)]( - ("/storage/rdd", (request: HttpServletRequest) => rddPage(request)), - ("/storage", (request: HttpServletRequest) => indexPage) - ) - - def rddPage(request: HttpServletRequest): Seq[Node] = { - val id = request.getParameter("id") - val prefix = "rdd_" + id.toString - val storageStatusList = sc.getExecutorStorageStatus - val filteredStorageStatusList = StorageUtils. - filterStorageStatusByPrefix(storageStatusList, prefix) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - - val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage") - val workers = filteredStorageStatusList.map((prefix, _)) - val workerTable = listingTable(workerHeaders, workerRow, workers) - - val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk") - val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1) - val blockTable = listingTable(blockHeaders, blockRow, blocks) - - val content = - <div class="row"> - <div class="span12"> - <ul class="unstyled"> - <li> - <strong>Storage Level:</strong> - {rddInfo.storageLevel.description} - </li> - <li> - <strong>Cached Partitions:</strong> - {rddInfo.numCachedPartitions} - </li> - <li> - <strong>Total Partitions:</strong> - {rddInfo.numPartitions} - </li> - <li> - <strong>Memory Size:</strong> - {Utils.memoryBytesToString(rddInfo.memSize)} - </li> - <li> - <strong>Disk Size:</strong> - {Utils.memoryBytesToString(rddInfo.diskSize)} - </li> - </ul> - </div> - </div> - <hr/> - <div class="row"> - <div class="span12"> - <h3> RDD Summary </h3> - <br/> {blockTable} - </div> - </div> - <hr/> ++ {workerTable}; - - WebUI.headerSparkPage(content, "RDD Info: " + id) - } - - def blockRow(blk: (String, BlockStatus)): Seq[Node] = { - val (id, block) = blk - <tr> - <td>{id}</td> - <td> - {block.storageLevel.description} - </td> - <td>{Utils.memoryBytesToString(block.memSize)}</td> - <td>{Utils.memoryBytesToString(block.diskSize)}</td> - </tr> - } - - def workerRow(worker: (String, StorageStatus)): Seq[Node] = { - val (prefix, status) = worker - <tr> - <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> - <td> - {Utils.memoryBytesToString(status.memUsed(prefix))} - ({Utils.memoryBytesToString(status.memRemaining)} Total Available) - </td> - <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td> - </tr> - } - - def indexPage: Seq[Node] = { - val storageStatusList = sc.getExecutorStorageStatus - // Calculate macro-level statistics - val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_+_).getOrElse(0L) - - val rddHeaders = Seq( - "RDD Name", - "Storage Level", - "Cached Partitions", - "Fraction Partitions Cached", - "Size in Memory", - "Size on Disk") - val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - val rddTable = listingTable(rddHeaders, rddRow, rdds) - - val content = - <div class="row"> - <div class="span12"> - <ul class="unstyled"> - <li><strong>Memory:</strong> - {Utils.memoryBytesToString(maxMem - remainingMem)} Used - ({Utils.memoryBytesToString(remainingMem)} Available) </li> - <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li> - </ul> - </div> - </div> ++ {rddTable}; - - WebUI.headerSparkPage(content, "Spark Storage ") - } - - def rddRow(rdd: RDDInfo): Seq[Node] = { - <tr> - <td> - <a href={"/storage/rdd?id=%s".format(rdd.id)}> - {rdd.name} - </a> - </td> - <td>{rdd.storageLevel.description} - </td> - <td>{rdd.numCachedPartitions}</td> - <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td> - <td>{Utils.memoryBytesToString(rdd.memSize)}</td> - <td>{Utils.memoryBytesToString(rdd.diskSize)}</td> - </tr> - } -} diff --git a/core/src/main/scala/spark/ui/JobProgressUI.scala b/core/src/main/scala/spark/ui/JobProgressUI.scala deleted file mode 100644 index b51e62bb1a..0000000000 --- a/core/src/main/scala/spark/ui/JobProgressUI.scala +++ /dev/null @@ -1,151 +0,0 @@ -package spark.ui - -import spark.{Utils, SparkContext} -import spark.scheduler._ -import spark.scheduler.SparkListenerTaskEnd -import spark.scheduler.StageCompleted -import spark.scheduler.SparkListenerStageSubmitted -import org.eclipse.jetty.server.Handler -import javax.servlet.http.HttpServletRequest -import xml.Node -import WebUI._ -import collection.mutable._ -import spark.Success -import akka.util.Duration -import java.text.SimpleDateFormat -import java.util.Date -import spark.scheduler.cluster.TaskInfo -import collection.mutable -import org.hsqldb.lib.HashMappedList -import spark.executor.TaskMetrics -import spark.scheduler.SparkListenerTaskEnd -import scala.Some -import spark.scheduler.SparkListenerStageSubmitted -import scala.Seq -import spark.scheduler.StageCompleted -import spark.scheduler.SparkListenerJobStart - -private[spark] -class JobProgressUI(sc: SparkContext) extends UIComponent { - val listener = new JobProgressListener - val fmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss") - - sc.addSparkListener(listener) - - def getHandlers = Seq[(String, Handler)]( - ("/stages/stage", (request: HttpServletRequest) => stagePage(request)), - ("/stages", (request: HttpServletRequest) => indexPage) - ) - - def stagePage(request: HttpServletRequest): Seq[Node] = { - val stageId = request.getParameter("id").toInt - - val taskHeaders = Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time") - val tasks = listener.stageToTaskInfos(stageId) - val taskTable = listingTable(taskHeaders, taskRow, tasks) - - val content = - <h2>Percentile Metrics</h2> - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>Service Time</th> - <th>Remote Bytes Read</th> - <th>Shuffle Bytes Written</th> - </tr> - </thead> - <tbody> - {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }} - </tbody> - </table> - <h2>Tasks</h2> ++ {taskTable}; - - WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId)) - } - - def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = { - val (info, metrics) = taskData - <tr> - <td>{info.taskId}</td> - <td>{metrics.executorRunTime}</td> - <td>{info.taskLocality}</td> - <td>{info.hostPort}</td> - <td>{fmt.format(new Date(info.launchTime))}</td> - </tr> - } - - def indexPage: Seq[Node] = { - val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total") - val activeStages = listener.activeStages.toSeq - val completedStages = listener.completedStages.toSeq - - val activeStageTable = listingTable(stageHeaders, stageRow, activeStages) - val completedStageTable = listingTable(stageHeaders, stageRow, completedStages) - - val content = - <h2>Active Stages</h2> ++ {activeStageTable} - <h2>Completed Stages</h2> ++ {completedStageTable} - - WebUI.headerSparkPage(content, "Spark Stages") - } - - def getElapsedTime(submitted: Option[Long], completed: Long): String = { - submitted match { - case Some(t) => Duration(completed - t, "milliseconds").printHMS - case _ => "Unknown" - } - } - - def stageRow(s: Stage): Seq[Node] = { - val submissionTime = s.submissionTime match { - case Some(t) => fmt.format(new Date(t)) - case None => "Unknown" - } - <tr> - <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td> - <td>{s.origin}</td> - <td>{submissionTime}</td> - <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td> - <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions} - {listener.stageToTasksFailed.getOrElse(s.id, 0) match { - case f if f > 0 => "(%s failed)".format(f) - case _ => - }} - </td> - </tr> - } -} - -private[spark] class JobProgressListener extends SparkListener { - val activeStages = HashSet[Stage]() - val stageToTasksComplete = HashMap[Int, Int]() - val stageToTasksFailed = HashMap[Int, Int]() - val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]() - val completedStages = ListBuffer[Stage]() // Todo (pwendell): Evict these over time - - override def onJobStart(jobStart: SparkListenerJobStart) { } - - override def onStageCompleted(stageCompleted: StageCompleted) = { - val stage = stageCompleted.stageInfo.stage - activeStages -= stage - stage +=: completedStages - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = - activeStages += stageSubmitted.stage - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - val sid = taskEnd.event.task.stageId - taskEnd.event.reason match { - case Success => - stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - case _ => - stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - } - val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]()) - taskList += ((taskEnd.event.taskInfo, taskEnd.event.taskMetrics)) - stageToTaskInfos(sid) = taskList - } - - override def onJobEnd(jobEnd: SparkListenerEvents) { } -}
\ No newline at end of file diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index 63b75df36f..a1f6cc60ec 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -1,8 +1,10 @@ package spark.ui +import jobs.JobProgressUI import spark.{Logging, SparkContext, Utils} import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler +import storage.BlockManagerUI import WebUI._ private[spark] class SparkUI(sc: SparkContext) extends Logging { diff --git a/core/src/main/scala/spark/ui/WebUI.scala b/core/src/main/scala/spark/ui/WebUI.scala index 265624a726..6dedd28fc1 100644 --- a/core/src/main/scala/spark/ui/WebUI.scala +++ b/core/src/main/scala/spark/ui/WebUI.scala @@ -13,6 +13,10 @@ abstract class UIComponent { def getHandlers(): Seq[(String, Handler)] } +abstract class View[T] { + def render(request: HttpServletRequest): T +} + object WebUI extends Logging { // CORE WEB UI COMPONENTS type Responder[T] = HttpServletRequest => T @@ -82,6 +86,7 @@ object WebUI extends Logging { } // HELPER FUNCTIONS AND SHORTCUTS + /** Page with Spark logo, title, and Spark UI headers */ def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = { val newContent = @@ -123,7 +128,7 @@ object WebUI extends Logging { /** Shortcut for making a table derived from a sequence of objects. */ def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { <table class="table table-bordered table-striped table-condensed sortable"> - <thead>{headers.map(h => "<th>%s</th>".format(h))}</thead> + <thead>{headers.map(h => <th>{h}</th>)}</thead> <tbody> {rows.map(r => makeRow(r))} </tbody> diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala new file mode 100644 index 0000000000..1740524f49 --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -0,0 +1,55 @@ +package spark.ui.jobs + +import spark.ui.{WebUI, View} +import xml.{NodeSeq, Node} +import spark.ui.WebUI._ +import scala.Some +import akka.util.Duration +import spark.scheduler.Stage +import java.util.Date +import javax.servlet.http.HttpServletRequest + +class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] { + val listener = parent.listener + val dateFmt = parent.dateFmt + + def render(request: HttpServletRequest): Seq[Node] = { + val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total") + val activeStages = listener.activeStages.toSeq + val completedStages = listener.completedStages.toSeq + + val activeStageTable: NodeSeq = listingTable(stageHeaders, stageRow, activeStages) + val completedStageTable = listingTable(stageHeaders, stageRow, completedStages) + + val content = <h2>Active Stages</h2> ++ activeStageTable ++ + <h2>Completed Stages</h2> ++ completedStageTable + + WebUI.headerSparkPage(content, "Spark Stages") + } + + def getElapsedTime(submitted: Option[Long], completed: Long): String = { + submitted match { + case Some(t) => Duration(completed - t, "milliseconds").printHMS + case _ => "Unknown" + } + } + + def stageRow(s: Stage): Seq[Node] = { + val submissionTime = s.submissionTime match { + case Some(t) => dateFmt.format(new Date(t)) + case None => "Unknown" + } + <tr> + <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td> + <td>{s.origin}</td> + <td>{submissionTime}</td> + <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td> + <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions} + {listener.stageToTasksFailed.getOrElse(s.id, 0) match { + case f if f > 0 => "(%s failed)".format(f) + case _ => + }} + </td> + </tr> + } +} diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala new file mode 100644 index 0000000000..9ee962bf2e --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -0,0 +1,82 @@ +package spark.ui.jobs + +import spark.{Utils, SparkContext} +import spark.scheduler._ +import spark.scheduler.SparkListenerTaskEnd +import spark.scheduler.StageCompleted +import spark.scheduler.SparkListenerStageSubmitted +import org.eclipse.jetty.server.Handler +import javax.servlet.http.HttpServletRequest +import xml.Node +import collection.mutable._ +import spark.Success +import akka.util.Duration +import java.text.SimpleDateFormat +import java.util.Date +import spark.scheduler.cluster.TaskInfo +import collection.mutable +import org.hsqldb.lib.HashMappedList +import spark.executor.TaskMetrics +import spark.scheduler.SparkListenerTaskEnd +import scala.Some +import spark.scheduler.SparkListenerStageSubmitted +import scala.Seq +import spark.scheduler.StageCompleted +import spark.scheduler.SparkListenerJobStart +import spark.ui.{WebUI, UIComponent} +import spark.ui.WebUI._ +import spark.scheduler.SparkListenerTaskEnd +import scala.Some +import spark.scheduler.SparkListenerStageSubmitted +import spark.scheduler.StageCompleted +import spark.scheduler.SparkListenerJobStart + +private[spark] +class JobProgressUI(sc: SparkContext) extends UIComponent { + val listener = new JobProgressListener + val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss") + + sc.addSparkListener(listener) + + val indexPage = new IndexPage(this) + val stagePage = new StagePage(this) + + def getHandlers = Seq[(String, Handler)]( + ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)), + ("/stages", (request: HttpServletRequest) => indexPage.render(request)) + ) +} + +private[spark] class JobProgressListener extends SparkListener { + val activeStages = HashSet[Stage]() + val stageToTasksComplete = HashMap[Int, Int]() + val stageToTasksFailed = HashMap[Int, Int]() + val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]() + val completedStages = ListBuffer[Stage]() // Todo (pwendell): Evict these over time + + override def onJobStart(jobStart: SparkListenerJobStart) { } + + override def onStageCompleted(stageCompleted: StageCompleted) = { + val stage = stageCompleted.stageInfo.stage + activeStages -= stage + stage +=: completedStages + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = + activeStages += stageSubmitted.stage + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val sid = taskEnd.event.task.stageId + taskEnd.event.reason match { + case Success => + stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + case _ => + stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + } + val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]()) + taskList += ((taskEnd.event.taskInfo, taskEnd.event.taskMetrics)) + stageToTaskInfos(sid) = taskList + } + + override def onJobEnd(jobEnd: SparkListenerEvents) { } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala new file mode 100644 index 0000000000..49f5b1c73c --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -0,0 +1,63 @@ +package spark.ui.jobs + +import javax.servlet.http.HttpServletRequest +import xml.Node +import spark.ui.WebUI._ +import spark.ui.WebUI +import spark.ui.View +import spark.util.Distribution +import spark.scheduler.cluster.TaskInfo +import spark.executor.TaskMetrics +import java.util.Date + +class StagePage(parent: JobProgressUI) extends View[Seq[Node]] { + val listener = parent.listener + val dateFmt = parent.dateFmt + + def render(request: HttpServletRequest): Seq[Node] = { + val stageId = request.getParameter("id").toInt + + val taskHeaders = Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time") + val tasks = listener.stageToTaskInfos(stageId) + val taskTable = listingTable(taskHeaders, taskRow, tasks) + + val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble} + val serviceQuantiles = "Service Time " ++ Distribution(serviceTimes).get.getQuantiles().map(_.toString) + val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") + + val quantileTable = listingTable(quantileHeaders, Seq(serviceQuantiles), quantileRow) + + val content = + <h2>Percentile Metrics</h2> + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <tr> + <th>Service Time</th> + <th>Remote Bytes Read</th> + <th>Shuffle Bytes Written</th> + </tr> + </thead> + <tbody> + {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }} + </tbody> + </table> + <h2>Tasks</h2> ++ {taskTable}; + + WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId)) + } + + def quantileRow(data: Seq[String]) = <tr> {data.map(d => <td>d</td>)} </tr> + + def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = { + val (info, metrics) = taskData + <tr> + <td>{info.taskId}</td> + <td>{metrics.executorRunTime}</td> + <td>{info.taskLocality}</td> + <td>{info.hostPort}</td> + <td>{dateFmt.format(new Date(info.launchTime))}</td> + </tr> + } + + +} diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala new file mode 100644 index 0000000000..21555384dd --- /dev/null +++ b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala @@ -0,0 +1,26 @@ +package spark.ui.storage + +import akka.util.Duration +import javax.servlet.http.HttpServletRequest +import org.eclipse.jetty.server.Handler +import spark.{Logging, SparkContext} +import spark.ui.WebUI._ +import spark.ui.{UIComponent} + +/** + * Web UI server for the BlockManager inside each SparkContext. + */ +private[spark] +class BlockManagerUI(val sc: SparkContext) + extends UIComponent with Logging { + implicit val timeout = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + + val indexPage = new IndexPage(this) + val rddPage = new RDDPage(this) + + def getHandlers = Seq[(String, Handler)]( + ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)), + ("/storage", (request: HttpServletRequest) => indexPage.render(request)) + ) +} diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala new file mode 100644 index 0000000000..2f4857d48a --- /dev/null +++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala @@ -0,0 +1,61 @@ +package spark.ui.storage + +import xml.Node +import spark.storage.{RDDInfo, StorageUtils} +import spark.ui.WebUI._ +import spark.Utils +import spark.ui.{View, WebUI} +import javax.servlet.http.HttpServletRequest + +class IndexPage(parent: BlockManagerUI) extends View[Seq[Node]] { + val sc = parent.sc + + def render(request: HttpServletRequest): Seq[Node] = { + val storageStatusList = sc.getExecutorStorageStatus + // Calculate macro-level statistics + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + + val rddHeaders = Seq( + "RDD Name", + "Storage Level", + "Cached Partitions", + "Fraction Partitions Cached", + "Size in Memory", + "Size on Disk") + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + val rddTable = listingTable(rddHeaders, rddRow, rdds) + + val content = + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>Memory:</strong> + {Utils.memoryBytesToString(maxMem - remainingMem)} Used + ({Utils.memoryBytesToString(remainingMem)} Available) </li> + <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li> + </ul> + </div> + </div> ++ {rddTable}; + + WebUI.headerSparkPage(content, "Spark Storage ") + } + + def rddRow(rdd: RDDInfo): Seq[Node] = { + <tr> + <td> + <a href={"/storage/rdd?id=%s".format(rdd.id)}> + {rdd.name} + </a> + </td> + <td>{rdd.storageLevel.description} + </td> + <td>{rdd.numCachedPartitions}</td> + <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td> + <td>{Utils.memoryBytesToString(rdd.memSize)}</td> + <td>{Utils.memoryBytesToString(rdd.diskSize)}</td> + </tr> + } +} diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala new file mode 100644 index 0000000000..957c400080 --- /dev/null +++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala @@ -0,0 +1,94 @@ +package spark.ui.storage + +import javax.servlet.http.HttpServletRequest +import xml.Node +import spark.storage.{StorageStatus, StorageUtils} +import spark.ui.WebUI._ +import spark.Utils +import spark.ui.WebUI +import spark.ui.View +import spark.storage.BlockManagerMasterActor.BlockStatus + + +class RDDPage(parent: BlockManagerUI) extends View[Seq[Node]] { + val sc = parent.sc + + def render(request: HttpServletRequest): Seq[Node] = { + val id = request.getParameter("id") + val prefix = "rdd_" + id.toString + val storageStatusList = sc.getExecutorStorageStatus + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + + val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage") + val workers = filteredStorageStatusList.map((prefix, _)) + val workerTable = listingTable(workerHeaders, workerRow, workers) + + val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk") + val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1) + val blockTable = listingTable(blockHeaders, blockRow, blocks) + + val content = + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li> + <strong>Storage Level:</strong> + {rddInfo.storageLevel.description} + </li> + <li> + <strong>Cached Partitions:</strong> + {rddInfo.numCachedPartitions} + </li> + <li> + <strong>Total Partitions:</strong> + {rddInfo.numPartitions} + </li> + <li> + <strong>Memory Size:</strong> + {Utils.memoryBytesToString(rddInfo.memSize)} + </li> + <li> + <strong>Disk Size:</strong> + {Utils.memoryBytesToString(rddInfo.diskSize)} + </li> + </ul> + </div> + </div> + <hr/> + <div class="row"> + <div class="span12"> + <h3> RDD Summary </h3> + <br/> {blockTable} + </div> + </div> + <hr/> ++ {workerTable}; + + WebUI.headerSparkPage(content, "RDD Info: " + id) + } + + def blockRow(blk: (String, BlockStatus)): Seq[Node] = { + val (id, block) = blk + <tr> + <td>{id}</td> + <td> + {block.storageLevel.description} + </td> + <td>{Utils.memoryBytesToString(block.memSize)}</td> + <td>{Utils.memoryBytesToString(block.diskSize)}</td> + </tr> + } + + def workerRow(worker: (String, StorageStatus)): Seq[Node] = { + val (prefix, status) = worker + <tr> + <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> + <td> + {Utils.memoryBytesToString(status.memUsed(prefix))} + ({Utils.memoryBytesToString(status.memRemaining)} Total Available) + </td> + <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td> + </tr> + } +} |