path: root/core
diff options
authorPatrick Wendell <pwendell@gmail.com>2013-06-21 13:47:27 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-22 10:31:37 -0700
commit3485e73376fcef524e62547ce3d69bb28a4381ad (patch)
tree21e5825f76fedd0822a016b57a70b0123e6c6ec1 /core
parentdd696f3a3d04f5e3b453173c0d4e86690f6bcb1b (diff)
Style cleanup
Diffstat (limited to 'core')
10 files changed, 428 insertions, 415 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 6a7bbdfcbf..3eca522c7e 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import spark.deploy._
import spark.{Logging, SparkException, Utils}
import spark.util.AkkaUtils
+import ui.MasterWebUI
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
deleted file mode 100644
index 3238661127..0000000000
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ /dev/null
@@ -1,266 +0,0 @@
-package spark.deploy.master
-import akka.actor.ActorRef
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.Duration
-import akka.util.duration._
-import javax.servlet.http.HttpServletRequest
-import net.liftweb.json.JsonAST.JValue
-import org.eclipse.jetty.server.Handler
-import scala.xml.Node
-import spark.{Logging, Utils}
-import spark.ui.JettyUtils
-import JettyUtils._
-import spark.deploy._
-import spark.deploy.MasterState
-import spark.ui.JettyUtils
- * Web UI server for the standalone master.
- */
-class MasterWebUI(master: ActorRef) extends Logging {
- implicit val timeout = Duration.create(
- System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
- val host = Utils.localHostName()
- val port = Option(System.getProperty("master.ui.port"))
- .getOrElse(MasterWebUI.DEFAULT_PORT).toInt
- def start() {
- try {
- val (server, boundPort) = JettyUtils.startJettyServer("", port, handlers)
- logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Master JettyUtils", e)
- System.exit(1)
- }
- }
- val handlers = Array[(String, Handler)](
- ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
- ("/app/json", (request: HttpServletRequest) => appDetailJson(request)),
- ("/app", (request: HttpServletRequest) => appDetail(request)),
- ("*", (request: HttpServletRequest) => index)
- )
- /** Executor details for a particular application */
- def appDetailJson(request: HttpServletRequest): JValue = {
- val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
- val state = Await.result(stateFuture, 3 seconds)
- val app = state.activeApps.find(_.id == appId).getOrElse({
- state.completedApps.find(_.id == appId).getOrElse(null)
- })
- JsonProtocol.writeApplicationInfo(app)
- }
- /** Executor details for a particular application */
- def appDetail(request: HttpServletRequest): Seq[Node] = {
- val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
- val state = Await.result(stateFuture, 3 seconds)
- val app = state.activeApps.find(_.id == appId).getOrElse({
- state.completedApps.find(_.id == appId).getOrElse(null)
- })
- val content =
- <hr />
- <div class="row">
- <div class="span12">
- <ul class="unstyled">
- <li><strong>ID:</strong> {app.id}</li>
- <li><strong>Description:</strong> {app.desc.name}</li>
- <li><strong>User:</strong> {app.desc.user}</li>
- <li><strong>Cores:</strong>
- {
- if (app.desc.maxCores == Integer.MAX_VALUE) {
- "Unlimited %s granted".format(app.coresGranted)
- } else {
- "%s (%s granted, %s left)".format(
- app.desc.maxCores, app.coresGranted, app.coresLeft)
- }
- }
- </li>
- <li><strong>Memory per Slave:</strong> {app.desc.memoryPerSlave}</li>
- <li><strong>Submit Date:</strong> {app.submitDate}</li>
- <li><strong>State:</strong> {app.state}</li>
- <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
- </ul>
- </div>
- </div>
- <hr/>
- <div class="row"> <!-- Executors -->
- <div class="span12">
- <h3> Executor Summary </h3>
- <br/>
- {executorTable(app.executors.values.toList)}
- </div>
- </div>;
- JettyUtils.sparkPage(content, "Application Info: " + app.desc.name)
- }
- def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
- <table class="table table-bordered table-striped table-condensed">
- <thead>
- <tr>
- <th>ExecutorID</th>
- <th>Worker</th>
- <th>Cores</th>
- <th>Memory</th>
- <th>State</th>
- <th>Logs</th>
- </tr>
- </thead>
- <tbody>
- {executors.map(executorRow)}
- </tbody>
- </table>
- }
- def executorRow(executor: ExecutorInfo): Seq[Node] = {
- <tr>
- <td>{executor.id}</td>
- <td>
- <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
- </td>
- <td>{executor.cores}</td>
- <td>{executor.memory}</td>
- <td>{executor.state}</td>
- <td>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
- </td>
- </tr>
- }
- /** Index view listing applications and executors */
- def index: Seq[Node] = {
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
- val state = Await.result(stateFuture, 3 seconds)
- val content =
- <hr />
- <div class="row">
- <div class="span12">
- <ul class="unstyled">
- <li><strong>URL:</strong>{state.uri}</li>
- <li><strong>Workers:</strong>{state.workers.size}</li>
- <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
- {state.workers.map(_.coresUsed).sum} Used</li>
- <li><strong>Memory:</strong>
- {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
- {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
- <li><strong>Applications:</strong>
- {state.activeApps.size} Running,
- {state.completedApps.size} Completed </li>
- </ul>
- </div>
- </div>
- <div class="row">
- <div class="span12">
- <h3> Workers </h3>
- <br/>
- {workerTable(state.workers.sortBy(_.id))}
- </div>
- </div>
- <hr/>
- <div class="row">
- <div class="span12">
- <h3> Running Applications </h3>
- <br/>
- {appTable(state.activeApps.sortBy(_.startTime).reverse)}
- </div>
- </div>
- <hr/>
- <div class="row">
- <div class="span12">
- <h3> Completed Applications </h3>
- <br/>
- {appTable(state.completedApps.sortBy(_.endTime).reverse)}
- </div>
- </div>;
- JettyUtils.sparkPage(content, "Spark Master: " + state.uri)
- }
- def workerTable(workers: Seq[spark.deploy.master.WorkerInfo]) = {
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>ID</th>
- <th>Address</th>
- <th>State</th>
- <th>Cores</th>
- <th>Memory</th>
- </tr>
- </thead>
- <tbody>
- {
- workers.map{ worker =>
- <tr>
- <td>
- <a href={worker.webUiAddress}>{worker.id}</a>
- </td>
- <td>{worker.host}:{worker.port}</td>
- <td>{worker.state}</td>
- <td>{worker.cores} ({worker.coresUsed} Used)</td>
- <td>{Utils.memoryMegabytesToString(worker.memory)}
- ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
- </tr>
- }
- }
- </tbody>
- </table>
- }
- def appTable(apps: Seq[spark.deploy.master.ApplicationInfo]) = {
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>ID</th>
- <th>Description</th>
- <th>Cores</th>
- <th>Memory per Node</th>
- <th>Submit Time</th>
- <th>User</th>
- <th>State</th>
- <th>Duration</th>
- </tr>
- </thead>
- <tbody>
- {
- apps.map{ app =>
- <tr>
- <td>
- <a href={"app?appId=" + app.id}>{app.id}</a>
- </td>
- <td>{app.desc.name}</td>
- <td>
- {app.coresGranted}
- </td>
- <td>{Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}</td>
- <td>{DeployWebUI.formatDate(app.submitDate)}</td>
- <td>{app.desc.user}</td>
- <td>{app.state.toString}</td>
- <td>{DeployWebUI.formatDuration(app.duration)}</td>
- </tr>
- }
- }
- </tbody>
- </table>
- }
-object MasterWebUI {
- val STATIC_RESOURCE_DIR = "spark/webui/static"
- val DEFAULT_PORT = "8080"
-} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
new file mode 100644
index 0000000000..2a810b71d2
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -0,0 +1,100 @@
+package spark.deploy.master.ui
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+import javax.servlet.http.HttpServletRequest
+import net.liftweb.json.JsonAST.JValue
+import scala.xml.Node
+import spark.deploy.{RequestMasterState, JsonProtocol, MasterState}
+import spark.deploy.master.ExecutorInfo
+import spark.ui.UIUtils
+class ApplicationPage(parent: MasterWebUI) {
+ val master = parent.master
+ implicit val timeout = parent.timeout
+ /** Executor details for a particular application */
+ def renderJson(request: HttpServletRequest): JValue = {
+ val appId = request.getParameter("appId")
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val state = Await.result(stateFuture, 3 seconds)
+ val app = state.activeApps.find(_.id == appId).getOrElse({
+ state.completedApps.find(_.id == appId).getOrElse(null)
+ })
+ JsonProtocol.writeApplicationInfo(app)
+ }
+ /** Executor details for a particular application */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val appId = request.getParameter("appId")
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val state = Await.result(stateFuture, 3 seconds)
+ val app = state.activeApps.find(_.id == appId).getOrElse({
+ state.completedApps.find(_.id == appId).getOrElse(null)
+ })
+ val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
+ val executors = app.executors.values.toSeq
+ val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
+ val content =
+ <hr />
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {app.id}</li>
+ <li><strong>Description:</strong> {app.desc.name}</li>
+ <li><strong>User:</strong> {app.desc.user}</li>
+ <li><strong>Cores:</strong>
+ {
+ if (app.desc.maxCores == Integer.MAX_VALUE) {
+ "Unlimited %s granted".format(app.coresGranted)
+ } else {
+ "%s (%s granted, %s left)".format(
+ app.desc.maxCores, app.coresGranted, app.coresLeft)
+ }
+ }
+ </li>
+ <li><strong>Memory per Slave:</strong> {app.desc.memoryPerSlave}</li>
+ <li><strong>Submit Date:</strong> {app.submitDate}</li>
+ <li><strong>State:</strong> {app.state}</li>
+ <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li>
+ </ul>
+ </div>
+ </div>
+ <hr/>
+ <div class="row"> <!-- Executors -->
+ <div class="span12">
+ <h3> Executor Summary </h3>
+ <br/>
+ {executorTable}
+ </div>
+ </div>;
+ UIUtils.sparkPage(content, "Application Info: " + app.desc.name)
+ }
+ def executorRow(executor: ExecutorInfo): Seq[Node] = {
+ <tr>
+ <td>{executor.id}</td>
+ <td>
+ <a href={executor.worker.webUiAddress}>{executor.worker.id}</a>
+ </td>
+ <td>{executor.cores}</td>
+ <td>{executor.memory}</td>
+ <td>{executor.state}</td>
+ <td>
+ <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
+ .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+ <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
+ .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
+ </td>
+ </tr>
+ }
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
new file mode 100644
index 0000000000..f833c59de8
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -0,0 +1,115 @@
+package spark.deploy.master.ui
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+import javax.servlet.http.HttpServletRequest
+import scala.xml.Node
+import spark.deploy.{RequestMasterState, DeployWebUI, MasterState}
+import spark.Utils
+import spark.ui.UIUtils
+import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+class IndexPage(parent: MasterWebUI) {
+ val master = parent.master
+ implicit val timeout = parent.timeout
+ /** Index view listing applications and executors */
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val state = Await.result(stateFuture, 3 seconds)
+ val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
+ val workers = state.workers.sortBy(_.id)
+ val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)
+ val appHeaders = Seq("ID", "Description", "Cores", "Memory per Node", "Submit Time", "User",
+ "State", "Duration")
+ val activeApps = state.activeApps.sortBy(_.startTime).reverse
+ val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
+ val completedApps = state.completedApps.sortBy(_.endTime).reverse
+ val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
+ val content =
+ <hr />
+ <div class="row">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>URL:</strong>{state.uri}</li>
+ <li><strong>Workers:</strong>{state.workers.size}</li>
+ <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
+ {state.workers.map(_.coresUsed).sum} Used</li>
+ <li><strong>Memory:</strong>
+ {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
+ {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
+ <li><strong>Applications:</strong>
+ {state.activeApps.size} Running,
+ {state.completedApps.size} Completed </li>
+ </ul>
+ </div>
+ </div>
+ <div class="row">
+ <div class="span12">
+ <h3> Workers </h3>
+ <br/>
+ {workerTable}
+ </div>
+ </div>
+ <hr/>
+ <div class="row">
+ <div class="span12">
+ <h3> Running Applications </h3>
+ <br/>
+ {activeAppsTable}
+ </div>
+ </div>
+ <hr/>
+ <div class="row">
+ <div class="span12">
+ <h3> Completed Applications </h3>
+ <br/>
+ {completedAppsTable}
+ </div>
+ </div>;
+ UIUtils.sparkPage(content, "Spark Master: " + state.uri)
+ }
+ def workerRow(worker: WorkerInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={worker.webUiAddress}>{worker.id}</a>
+ </td>
+ <td>{worker.host}:{worker.port}</td>
+ <td>{worker.state}</td>
+ <td>{worker.cores} ({worker.coresUsed} Used)</td>
+ <td>{Utils.memoryMegabytesToString(worker.memory)}
+ ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
+ </tr>
+ }
+ def appRow(app: ApplicationInfo): Seq[Node] = {
+ <tr>
+ <td>
+ <a href={"app?appId=" + app.id}>{app.id}</a>
+ </td>
+ <td>{app.desc.name}</td>
+ <td>
+ {app.coresGranted}
+ </td>
+ <td>{Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}</td>
+ <td>{DeployWebUI.formatDate(app.submitDate)}</td>
+ <td>{app.desc.user}</td>
+ <td>{app.state.toString}</td>
+ <td>{DeployWebUI.formatDuration(app.duration)}</td>
+ </tr>
+ }
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
new file mode 100644
index 0000000000..5fd17f10c6
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -0,0 +1,50 @@
+package spark.deploy.master.ui
+import akka.actor.ActorRef
+import akka.util.Duration
+import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.Handler
+import spark.{Logging, Utils}
+import spark.ui.JettyUtils
+import spark.ui.JettyUtils._
+ * Web UI server for the standalone master.
+ */
+class MasterWebUI(val master: ActorRef) extends Logging {
+ implicit val timeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ val host = Utils.localHostName()
+ val port = Option(System.getProperty("master.ui.port"))
+ .getOrElse(MasterWebUI.DEFAULT_PORT).toInt
+ val applicationPage = new ApplicationPage(this)
+ val indexPage = new IndexPage(this)
+ def start() {
+ try {
+ val (server, boundPort) = JettyUtils.startJettyServer("", port, handlers)
+ logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Master JettyUtils", e)
+ System.exit(1)
+ }
+ }
+ val handlers = Array[(String, Handler)](
+ ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
+ ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
+ ("/app", (request: HttpServletRequest) => applicationPage.render(request)),
+ ("*", (request: HttpServletRequest) => indexPage.render(request))
+ )
+object MasterWebUI {
+ val STATIC_RESOURCE_DIR = "spark/webui/static"
+ val DEFAULT_PORT = "8080"
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 3878fe3f7b..690bdfe128 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -14,6 +14,7 @@ import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed
import spark.deploy.master.Master
import java.io.File
+import ui.WorkerWebUI
private[spark] class Worker(
host: String,
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
deleted file mode 100644
index 805e7b52db..0000000000
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-package spark.deploy.worker
-import akka.actor.ActorRef
-import akka.dispatch.Await
-import akka.pattern.ask
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
-import java.io.File
-import javax.servlet.http.HttpServletRequest
-import net.liftweb.json.JsonAST.JValue
-import org.eclipse.jetty.server.Handler
-import scala.io.Source
-import spark.{Utils, Logging}
-import spark.deploy.{JsonProtocol, WorkerState, RequestWorkerState}
-import spark.ui.{JettyUtils => UtilsWebUI}
-import spark.ui.JettyUtils._
-import xml.Node
- * Web UI server for the standalone worker.
- */
-class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
- implicit val timeout = Timeout(
- Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
- val host = Utils.localHostName()
- val port = Option(System.getProperty("wroker.ui.port"))
- .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
- val handlers = Array[(String, Handler)](
- ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
- ("/log", (request: HttpServletRequest) => log(request)),
- ("/json", (request: HttpServletRequest) => indexJson),
- ("*", (request: HttpServletRequest) => index)
- )
- def start() {
- try {
- val (server, boundPort) = UtilsWebUI.startJettyServer("", port, handlers)
- logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Worker JettyUtils", e)
- System.exit(1)
- }
- }
- def indexJson(): JValue = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
- val workerState = Await.result(stateFuture, 3 seconds)
- JsonProtocol.writeWorkerState(workerState)
- }
- def index(): Seq[Node] = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
- val workerState = Await.result(stateFuture, 3 seconds)
- val content =
- <hr />
- <div class="row"> <!-- Worker Details -->
- <div class="span12">
- <ul class="unstyled">
- <li><strong>ID:</strong> {workerState.workerId}</li>
- <li><strong>
- Master URL:</strong> {workerState.masterUrl}
- </li>
- <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
- <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
- ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
- </ul>
- <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
- </div>
- </div>
- <hr/>
- <div class="row"> <!-- Running Executors -->
- <div class="span12">
- <h3> Running Executors {workerState.executors.size} </h3>
- <br/>
- {executorTable(workerState.executors)}
- </div>
- </div>
- <hr/>
- <div class="row"> <!-- Finished Executors -->
- <div class="span12">
- <h3> Finished Executors </h3>
- <br/>
- {executorTable(workerState.finishedExecutors)}
- </div>
- </div>;
- UtilsWebUI.sparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
- }
- def executorTable(executors: Seq[ExecutorRunner]): Seq[Node] = {
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <tr>
- <th>ExecutorID</th>
- <th>Cores</th>
- <th>Memory</th>
- <th>Job Details</th>
- <th>Logs</th>
- </tr>
- </thead>
- <tbody>
- {executors.map(executorRow)}
- </tbody>
- </table>
- }
- def executorRow(executor: ExecutorRunner): Seq[Node] = {
- <tr>
- <td>{executor.execId}</td>
- <td>{executor.cores}</td>
- <td>{Utils.memoryMegabytesToString(executor.memory)}</td>
- <td>
- <ul class="unstyled">
- <li><strong>ID:</strong> {executor.appId}</li>
- <li><strong>Name:</strong> {executor.appDesc.name}</li>
- <li><strong>User:</strong> {executor.appDesc.user}</li>
- </ul>
- </td>
- <td>
- <a href={"log?appId=%s&executorId=%s&logType=stdout"
- .format(executor.appId, executor.execId)}>stdout</a>
- <a href={"log?appId=%s&executorId=%s&logType=stderr"
- .format(executor.appId, executor.execId)}>stderr</a>
- </td>
- </tr>
- }
- def log(request: HttpServletRequest): String = {
- val appId = request.getParameter("appId")
- val executorId = request.getParameter("executorId")
- val logType = request.getParameter("logType")
- val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
- val source = Source.fromFile(path)
- val lines = source.mkString
- source.close()
- lines
- }
-object WorkerWebUI {
- val STATIC_RESOURCE_DIR = "spark/webui/static"
- val DEFAULT_PORT="8081"
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
new file mode 100644
index 0000000000..a6aba59e9f
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -0,0 +1,97 @@
+package spark.deploy.worker.ui
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.duration._
+import javax.servlet.http.HttpServletRequest
+import net.liftweb.json.JsonAST.JValue
+import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState}
+import spark.deploy.worker.ExecutorRunner
+import spark.Utils
+import spark.ui.UIUtils
+import xml.Node
+class IndexPage(parent: WorkerWebUI) {
+ val worker = parent.worker
+ val timeout = parent.timeout
+ def renderJson(request: HttpServletRequest): JValue = {
+ val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val workerState = Await.result(stateFuture, 3 seconds)
+ JsonProtocol.writeWorkerState(workerState)
+ }
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val workerState = Await.result(stateFuture, 3 seconds)
+ val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
+ val runningExecutorTable =
+ UIUtils.listingTable(executorHeaders, executorRow, workerState.executors)
+ val finishedExecutorTable =
+ UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
+ val content =
+ <hr />
+ <div class="row"> <!-- Worker Details -->
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {workerState.workerId}</li>
+ <li><strong>
+ Master URL:</strong> {workerState.masterUrl}
+ </li>
+ <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
+ <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
+ ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
+ </ul>
+ <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
+ </div>
+ </div>
+ <hr/>
+ <div class="row"> <!-- Running Executors -->
+ <div class="span12">
+ <h3> Running Executors {workerState.executors.size} </h3>
+ <br/>
+ {runningExecutorTable}
+ </div>
+ </div>
+ <hr/>
+ <div class="row"> <!-- Finished Executors -->
+ <div class="span12">
+ <h3> Finished Executors </h3>
+ <br/>
+ {finishedExecutorTable}
+ </div>
+ </div>;
+ UIUtils.sparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
+ }
+ def executorRow(executor: ExecutorRunner): Seq[Node] = {
+ <tr>
+ <td>{executor.execId}</td>
+ <td>{executor.cores}</td>
+ <td>{Utils.memoryMegabytesToString(executor.memory)}</td>
+ <td>
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {executor.appId}</li>
+ <li><strong>Name:</strong> {executor.appDesc.name}</li>
+ <li><strong>User:</strong> {executor.appDesc.user}</li>
+ </ul>
+ </td>
+ <td>
+ <a href={"log?appId=%s&executorId=%s&logType=stdout"
+ .format(executor.appId, executor.execId)}>stdout</a>
+ <a href={"log?appId=%s&executorId=%s&logType=stderr"
+ .format(executor.appId, executor.execId)}>stderr</a>
+ </td>
+ </tr>
+ }
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
new file mode 100644
index 0000000000..abfc847527
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -0,0 +1,64 @@
+package spark.deploy.worker.ui
+import akka.actor.ActorRef
+import akka.util.{Duration, Timeout}
+import java.io.File
+import javax.servlet.http.HttpServletRequest
+import org.eclipse.jetty.server.Handler
+import scala.io.Source
+import spark.{Utils, Logging}
+import spark.ui.JettyUtils
+import spark.ui.JettyUtils._
+ * Web UI server for the standalone worker.
+ */
+class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
+ implicit val timeout = Timeout(
+ Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+ val host = Utils.localHostName()
+ val port = Option(System.getProperty("wroker.ui.port"))
+ .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
+ val indexPage = new IndexPage(this)
+ val handlers = Array[(String, Handler)](
+ ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
+ ("/log", (request: HttpServletRequest) => log(request)),
+ ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
+ ("*", (request: HttpServletRequest) => indexPage.render(request))
+ )
+ def start() {
+ try {
+ val (server, boundPort) = JettyUtils.startJettyServer("", port, handlers)
+ logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Worker JettyUtils", e)
+ System.exit(1)
+ }
+ }
+ def log(request: HttpServletRequest): String = {
+ val appId = request.getParameter("appId")
+ val executorId = request.getParameter("executorId")
+ val logType = request.getParameter("logType")
+ val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+ val source = Source.fromFile(path)
+ val lines = source.mkString
+ source.close()
+ lines
+ }
+object WorkerWebUI {
+ val STATIC_RESOURCE_DIR = "spark/webui/static"
+ val DEFAULT_PORT="8081"
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index 8bb343163a..93e7129fc0 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -15,7 +15,6 @@ import spark.Logging
import xml.Node
/** Utilities for launching a web server using Jetty's HTTP Server class */
private[spark] object JettyUtils extends Logging {
// Base type for a function that returns something based on an HTTP request. Allows for