diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-22 16:57:16 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-22 16:57:16 -0700 |
commit | 401aac8b189aa6b72ad020ba894ca57b948c53a1 (patch) | |
tree | bba1bbc2e52045bccbf344d0530961c6e8f01d8b | |
parent | ea1cfabfdd8f37fb655e5de0526fa9fb45568344 (diff) | |
parent | 872c97ad829ba20e866c4e45054e7d2d05b02042 (diff) | |
download | spark-401aac8b189aa6b72ad020ba894ca57b948c53a1.tar.gz spark-401aac8b189aa6b72ad020ba894ca57b948c53a1.tar.bz2 spark-401aac8b189aa6b72ad020ba894ca57b948c53a1.zip |
Merge pull request #719 from karenfeng/ui-808
Creates Executors tab for Jobs UI
-rw-r--r-- | core/src/main/scala/spark/ui/Page.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/SparkUI.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/UIUtils.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/exec/ExecutorsUI.scala | 136 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/storage/IndexPage.scala | 18 |
5 files changed, 148 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/ui/Page.scala b/core/src/main/scala/spark/ui/Page.scala index a31e750d06..03034a4520 100644 --- a/core/src/main/scala/spark/ui/Page.scala +++ b/core/src/main/scala/spark/ui/Page.scala @@ -17,4 +17,4 @@ package spark.ui -private[spark] object Page extends Enumeration { val Storage, Jobs, Environment = Value } +private[spark] object Page extends Enumeration { val Storage, Jobs, Environment, Executors = Value } diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index 9396f22063..7599f82a94 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -23,6 +23,7 @@ import org.eclipse.jetty.server.{Handler, Server} import spark.{Logging, SparkContext, Utils} import spark.ui.env.EnvironmentUI +import spark.ui.exec.ExecutorsUI import spark.ui.storage.BlockManagerUI import spark.ui.jobs.JobProgressUI import spark.ui.JettyUtils._ @@ -41,7 +42,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { val storage = new BlockManagerUI(sc) val jobs = new JobProgressUI(sc) val env = new EnvironmentUI(sc) - val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ handlers + val exec = new ExecutorsUI(sc) + val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ + exec.getHandlers ++ handlers /** Bind the HTTP server which backs this web interface */ def bind() { @@ -64,6 +67,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { // This server must register all handlers, including JobProgressUI, before binding // JobProgressUI registers a listener with SparkContext, which requires sc to initialize jobs.start() + exec.start() } def stop() { diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala index b1d11954dd..e33c80282a 100644 --- a/core/src/main/scala/spark/ui/UIUtils.scala +++ b/core/src/main/scala/spark/ui/UIUtils.scala @@ -40,6 +40,10 @@ private[spark] object UIUtils { case Environment => <li class="active"><a href="/environment">Environment</a></li> case _ => <li><a href="/environment">Environment</a></li> } + val executors = page match { + case Executors => <li class="active"><a href="/executors">Executors</a></li> + case _ => <li><a href="/executors">Executors</a></li> + } <html> <head> @@ -66,6 +70,7 @@ private[spark] object UIUtils { {storage} {jobs} {environment} + {executors} </ul> <ul id="infolist"> <li>Application: <strong>{sc.appName}</strong></li> diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala new file mode 100644 index 0000000000..20ea54d6a6 --- /dev/null +++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala @@ -0,0 +1,136 @@ +package spark.ui.exec + + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.util.Properties + +import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils} +import spark.executor.TaskMetrics +import spark.scheduler.cluster.TaskInfo +import spark.scheduler._ +import spark.SparkContext +import spark.storage.{StorageStatus, StorageUtils} +import spark.ui.JettyUtils._ +import spark.ui.Page.Executors +import spark.ui.UIUtils.headerSparkPage +import spark.ui.UIUtils +import spark.Utils + +import scala.xml.{Node, XML} + +private[spark] class ExecutorsUI(val sc: SparkContext) { + + private var _listener: Option[ExecutorsListener] = None + def listener = _listener.get + + def start() { + _listener = Some(new ExecutorsListener) + sc.addSparkListener(listener) + } + + def getHandlers = Seq[(String, Handler)]( + ("/executors", (request: HttpServletRequest) => render(request)) + ) + + def render(request: HttpServletRequest): Seq[Node] = { + val storageStatusList = sc.getExecutorStorageStatus + + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + + val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used", + "Failed tasks", "Complete tasks", "Total tasks") + def execRow(kv: Seq[String]) = + <tr> + <td>{kv(0)}</td> + <td>{kv(1)}</td> + <td>{kv(2)}</td> + <td sorttable_customkey={kv(3)}> + {Utils.memoryBytesToString(kv(3).toLong)} / {Utils.memoryBytesToString(kv(4).toLong)} + </td> + <td sorttable_customkey={kv(5)}> + {Utils.memoryBytesToString(kv(5).toLong)} + </td> + <td>{kv(6)}</td> + <td>{kv(7)}</td> + <td>{kv(8)}</td> + </tr> + val execInfo = + for (b <- 0 until storageStatusList.size) + yield getExecInfo(b) + val execTable = UIUtils.listingTable(execHead, execRow, execInfo) + + val content = + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>Memory:</strong> + {Utils.memoryBytesToString(memUsed)} Used + ({Utils.memoryBytesToString(maxMem)} Total) </li> + <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li> + </ul> + </div> + </div> + <div class = "row"> + <div class="span12"> + {execTable} + </div> + </div>; + + headerSparkPage(content, sc, "Executors", Executors) + } + + def getExecInfo(a: Int): Seq[String] = { + val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId + val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort + val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString + val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString + val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString + val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString + val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString + val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString + val totalTasks = listener.executorToTaskInfos(a.toString).size.toString + + Seq( + execId, + hostPort, + rddBlocks, + memUsed, + maxMem, + diskUsed, + failedTasks, + completedTasks, + totalTasks + ) + } + + private[spark] class ExecutorsListener extends SparkListener with Logging { + val executorToTasksComplete = HashMap[String, Int]() + val executorToTasksFailed = HashMap[String, Int]() + val executorToTaskInfos = + HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val eid = taskEnd.taskInfo.executorId + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + taskEnd.reason match { + case e: ExceptionFailure => + executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 + (Some(e), e.metrics) + case _ => + executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 + (None, Some(taskEnd.taskMetrics)) + } + val taskList = executorToTaskInfos.getOrElse( + eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) + executorToTaskInfos(eid) = taskList + } + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala index 4e0360d19a..f76192eba8 100644 --- a/core/src/main/scala/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala @@ -33,10 +33,6 @@ private[spark] class IndexPage(parent: BlockManagerUI) { def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = sc.getExecutorStorageStatus // Calculate macro-level statistics - val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_+_).getOrElse(0L) val rddHeaders = Seq( "RDD Name", @@ -46,19 +42,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) { "Size in Memory", "Size on Disk") val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - val rddTable = listingTable(rddHeaders, rddRow, rdds) - - val content = - <div class="row"> - <div class="span12"> - <ul class="unstyled"> - <li><strong>Memory:</strong> - {Utils.memoryBytesToString(maxMem - remainingMem)} Used - ({Utils.memoryBytesToString(remainingMem)} Available) </li> - <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li> - </ul> - </div> - </div> ++ {rddTable}; + val content = listingTable(rddHeaders, rddRow, rdds) headerSparkPage(content, parent.sc, "Spark Storage ", Storage) } |