aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-22 16:57:16 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-22 16:57:16 -0700
commit401aac8b189aa6b72ad020ba894ca57b948c53a1 (patch)
treebba1bbc2e52045bccbf344d0530961c6e8f01d8b /core/src
parentea1cfabfdd8f37fb655e5de0526fa9fb45568344 (diff)
parent872c97ad829ba20e866c4e45054e7d2d05b02042 (diff)
downloadspark-401aac8b189aa6b72ad020ba894ca57b948c53a1.tar.gz
spark-401aac8b189aa6b72ad020ba894ca57b948c53a1.tar.bz2
spark-401aac8b189aa6b72ad020ba894ca57b948c53a1.zip
Merge pull request #719 from karenfeng/ui-808
Creates Executors tab for Jobs UI
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/ui/Page.scala2
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala6
-rw-r--r--core/src/main/scala/spark/ui/UIUtils.scala5
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala136
-rw-r--r--core/src/main/scala/spark/ui/storage/IndexPage.scala18
5 files changed, 148 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/ui/Page.scala b/core/src/main/scala/spark/ui/Page.scala
index a31e750d06..03034a4520 100644
--- a/core/src/main/scala/spark/ui/Page.scala
+++ b/core/src/main/scala/spark/ui/Page.scala
@@ -17,4 +17,4 @@
package spark.ui
-private[spark] object Page extends Enumeration { val Storage, Jobs, Environment = Value }
+private[spark] object Page extends Enumeration { val Storage, Jobs, Environment, Executors = Value }
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 9396f22063..7599f82a94 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -23,6 +23,7 @@ import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, SparkContext, Utils}
import spark.ui.env.EnvironmentUI
+import spark.ui.exec.ExecutorsUI
import spark.ui.storage.BlockManagerUI
import spark.ui.jobs.JobProgressUI
import spark.ui.JettyUtils._
@@ -41,7 +42,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val storage = new BlockManagerUI(sc)
val jobs = new JobProgressUI(sc)
val env = new EnvironmentUI(sc)
- val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ handlers
+ val exec = new ExecutorsUI(sc)
+ val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
+ exec.getHandlers ++ handlers
/** Bind the HTTP server which backs this web interface */
def bind() {
@@ -64,6 +67,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
// This server must register all handlers, including JobProgressUI, before binding
// JobProgressUI registers a listener with SparkContext, which requires sc to initialize
jobs.start()
+ exec.start()
}
def stop() {
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index b1d11954dd..e33c80282a 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -40,6 +40,10 @@ private[spark] object UIUtils {
case Environment => <li class="active"><a href="/environment">Environment</a></li>
case _ => <li><a href="/environment">Environment</a></li>
}
+ val executors = page match {
+ case Executors => <li class="active"><a href="/executors">Executors</a></li>
+ case _ => <li><a href="/executors">Executors</a></li>
+ }
<html>
<head>
@@ -66,6 +70,7 @@ private[spark] object UIUtils {
{storage}
{jobs}
{environment}
+ {executors}
</ul>
<ul id="infolist">
<li>Application: <strong>{sc.appName}</strong></li>
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
new file mode 100644
index 0000000000..20ea54d6a6
--- /dev/null
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -0,0 +1,136 @@
+package spark.ui.exec
+
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.util.Properties
+
+import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
+import spark.executor.TaskMetrics
+import spark.scheduler.cluster.TaskInfo
+import spark.scheduler._
+import spark.SparkContext
+import spark.storage.{StorageStatus, StorageUtils}
+import spark.ui.JettyUtils._
+import spark.ui.Page.Executors
+import spark.ui.UIUtils.headerSparkPage
+import spark.ui.UIUtils
+import spark.Utils
+
+import scala.xml.{Node, XML}
+
+private[spark] class ExecutorsUI(val sc: SparkContext) {
+
+ private var _listener: Option[ExecutorsListener] = None
+ def listener = _listener.get
+
+ def start() {
+ _listener = Some(new ExecutorsListener)
+ sc.addSparkListener(listener)
+ }
+
+ def getHandlers = Seq[(String, Handler)](
+ ("/executors", (request: HttpServletRequest) => render(request))
+ )
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val storageStatusList = sc.getExecutorStorageStatus
+
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_+_).getOrElse(0L)
+
+ val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
+ "Failed tasks", "Complete tasks", "Total tasks")
+ def execRow(kv: Seq[String]) =
+ <tr>
+ <td>{kv(0)}</td>
+ <td>{kv(1)}</td>
+ <td>{kv(2)}</td>
+ <td sorttable_customkey={kv(3)}>
+ {Utils.memoryBytesToString(kv(3).toLong)} / {Utils.memoryBytesToString(kv(4).toLong)}
+ </td>
+ <td sorttable_customkey={kv(5)}>
+ {Utils.memoryBytesToString(kv(5).toLong)}
+ </td>
+ <td>{kv(6)}</td>
+ <td>{kv(7)}</td>
+ <td>{kv(8)}</td>
+ </tr>
+ val execInfo =
+ for (b <- 0 until storageStatusList.size)
+ yield getExecInfo(b)
+ val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
+
+ val content =
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>Memory:</strong>
+ {Utils.memoryBytesToString(memUsed)} Used
+ ({Utils.memoryBytesToString(maxMem)} Total) </li>
+ <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
+ </ul>
+ </div>
+ </div>
+ <div class = "row">
+ <div class="span12">
+ {execTable}
+ </div>
+ </div>;
+
+ headerSparkPage(content, sc, "Executors", Executors)
+ }
+
+ def getExecInfo(a: Int): Seq[String] = {
+ val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
+ val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
+ val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
+ val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
+ val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
+ val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
+ val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
+ val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
+ val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
+
+ Seq(
+ execId,
+ hostPort,
+ rddBlocks,
+ memUsed,
+ maxMem,
+ diskUsed,
+ failedTasks,
+ completedTasks,
+ totalTasks
+ )
+ }
+
+ private[spark] class ExecutorsListener extends SparkListener with Logging {
+ val executorToTasksComplete = HashMap[String, Int]()
+ val executorToTasksFailed = HashMap[String, Int]()
+ val executorToTaskInfos =
+ HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val eid = taskEnd.taskInfo.executorId
+ val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+ (Some(e), e.metrics)
+ case _ =>
+ executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+ (None, Some(taskEnd.taskMetrics))
+ }
+ val taskList = executorToTaskInfos.getOrElse(
+ eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+ executorToTaskInfos(eid) = taskList
+ }
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
index 4e0360d19a..f76192eba8 100644
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -33,10 +33,6 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_+_).getOrElse(0L)
val rddHeaders = Seq(
"RDD Name",
@@ -46,19 +42,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
"Size in Memory",
"Size on Disk")
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
- val rddTable = listingTable(rddHeaders, rddRow, rdds)
-
- val content =
- <div class="row">
- <div class="span12">
- <ul class="unstyled">
- <li><strong>Memory:</strong>
- {Utils.memoryBytesToString(maxMem - remainingMem)} Used
- ({Utils.memoryBytesToString(remainingMem)} Available) </li>
- <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
- </ul>
- </div>
- </div> ++ {rddTable};
+ val content = listingTable(rddHeaders, rddRow, rdds)
headerSparkPage(content, parent.sc, "Spark Storage ", Storage)
}