diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-06-21 13:47:27 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-06-22 10:31:37 -0700 |
commit | 3485e73376fcef524e62547ce3d69bb28a4381ad (patch) | |
tree | 21e5825f76fedd0822a016b57a70b0123e6c6ec1 /core | |
parent | dd696f3a3d04f5e3b453173c0d4e86690f6bcb1b (diff) | |
download | spark-3485e73376fcef524e62547ce3d69bb28a4381ad.tar.gz spark-3485e73376fcef524e62547ce3d69bb28a4381ad.tar.bz2 spark-3485e73376fcef524e62547ce3d69bb28a4381ad.zip |
Style cleanup
Diffstat (limited to 'core')
10 files changed, 428 insertions, 415 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 6a7bbdfcbf..3eca522c7e 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import spark.deploy._ import spark.{Logging, SparkException, Utils} import spark.util.AkkaUtils +import ui.MasterWebUI private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala deleted file mode 100644 index 3238661127..0000000000 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ /dev/null @@ -1,266 +0,0 @@ -package spark.deploy.master - -import akka.actor.ActorRef -import akka.dispatch.Await -import akka.pattern.ask -import akka.util.Duration -import akka.util.duration._ -import javax.servlet.http.HttpServletRequest -import net.liftweb.json.JsonAST.JValue -import org.eclipse.jetty.server.Handler -import scala.xml.Node -import spark.{Logging, Utils} -import spark.ui.JettyUtils -import JettyUtils._ -import spark.deploy._ -import spark.deploy.MasterState -import spark.ui.JettyUtils - -/** - * Web UI server for the standalone master. - */ -private[spark] -class MasterWebUI(master: ActorRef) extends Logging { - implicit val timeout = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - val host = Utils.localHostName() - val port = Option(System.getProperty("master.ui.port")) - .getOrElse(MasterWebUI.DEFAULT_PORT).toInt - - def start() { - try { - val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) - logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Master JettyUtils", e) - System.exit(1) - } - } - - val handlers = Array[(String, Handler)]( - ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), - ("/app/json", (request: HttpServletRequest) => appDetailJson(request)), - ("/app", (request: HttpServletRequest) => appDetail(request)), - ("*", (request: HttpServletRequest) => index) - ) - - /** Executor details for a particular application */ - def appDetailJson(request: HttpServletRequest): JValue = { - val appId = request.getParameter("appId") - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] - val state = Await.result(stateFuture, 3 seconds) - val app = state.activeApps.find(_.id == appId).getOrElse({ - state.completedApps.find(_.id == appId).getOrElse(null) - }) - JsonProtocol.writeApplicationInfo(app) - } - - /** Executor details for a particular application */ - def appDetail(request: HttpServletRequest): Seq[Node] = { - val appId = request.getParameter("appId") - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] - val state = Await.result(stateFuture, 3 seconds) - val app = state.activeApps.find(_.id == appId).getOrElse({ - state.completedApps.find(_.id == appId).getOrElse(null) - }) - val content = - <hr /> - <div class="row"> - <div class="span12"> - <ul class="unstyled"> - <li><strong>ID:</strong> {app.id}</li> - <li><strong>Description:</strong> {app.desc.name}</li> - <li><strong>User:</strong> {app.desc.user}</li> - <li><strong>Cores:</strong> - { - if (app.desc.maxCores == Integer.MAX_VALUE) { - "Unlimited %s granted".format(app.coresGranted) - } else { - "%s (%s granted, %s left)".format( - app.desc.maxCores, app.coresGranted, app.coresLeft) - } - } - </li> - <li><strong>Memory per Slave:</strong> {app.desc.memoryPerSlave}</li> - <li><strong>Submit Date:</strong> {app.submitDate}</li> - <li><strong>State:</strong> {app.state}</li> - <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li> - </ul> - </div> - </div> - - <hr/> - - <div class="row"> <!-- Executors --> - <div class="span12"> - <h3> Executor Summary </h3> - <br/> - {executorTable(app.executors.values.toList)} - </div> - </div>; - JettyUtils.sparkPage(content, "Application Info: " + app.desc.name) - } - - def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = { - <table class="table table-bordered table-striped table-condensed"> - <thead> - <tr> - <th>ExecutorID</th> - <th>Worker</th> - <th>Cores</th> - <th>Memory</th> - <th>State</th> - <th>Logs</th> - </tr> - </thead> - <tbody> - {executors.map(executorRow)} - </tbody> - </table> - } - - def executorRow(executor: ExecutorInfo): Seq[Node] = { - <tr> - <td>{executor.id}</td> - <td> - <a href={executor.worker.webUiAddress}>{executor.worker.id}</a> - </td> - <td>{executor.cores}</td> - <td>{executor.memory}</td> - <td>{executor.state}</td> - <td> - <a href={"%s/log?appId=%s&executorId=%s&logType=stdout" - .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a> - <a href={"%s/log?appId=%s&executorId=%s&logType=stderr" - .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a> - </td> - </tr> - } - - /** Index view listing applications and executors */ - def index: Seq[Node] = { - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] - val state = Await.result(stateFuture, 3 seconds) - - val content = - <hr /> - <div class="row"> - <div class="span12"> - <ul class="unstyled"> - <li><strong>URL:</strong>{state.uri}</li> - <li><strong>Workers:</strong>{state.workers.size}</li> - <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total, - {state.workers.map(_.coresUsed).sum} Used</li> - <li><strong>Memory:</strong> - {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total, - {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li> - <li><strong>Applications:</strong> - {state.activeApps.size} Running, - {state.completedApps.size} Completed </li> - </ul> - </div> - </div> - - <div class="row"> - <div class="span12"> - <h3> Workers </h3> - <br/> - {workerTable(state.workers.sortBy(_.id))} - </div> - </div> - - <hr/> - - <div class="row"> - <div class="span12"> - <h3> Running Applications </h3> - <br/> - {appTable(state.activeApps.sortBy(_.startTime).reverse)} - </div> - </div> - - <hr/> - - <div class="row"> - <div class="span12"> - <h3> Completed Applications </h3> - <br/> - {appTable(state.completedApps.sortBy(_.endTime).reverse)} - </div> - </div>; - JettyUtils.sparkPage(content, "Spark Master: " + state.uri) - } - - def workerTable(workers: Seq[spark.deploy.master.WorkerInfo]) = { - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>ID</th> - <th>Address</th> - <th>State</th> - <th>Cores</th> - <th>Memory</th> - </tr> - </thead> - <tbody> - { - workers.map{ worker => - <tr> - <td> - <a href={worker.webUiAddress}>{worker.id}</a> - </td> - <td>{worker.host}:{worker.port}</td> - <td>{worker.state}</td> - <td>{worker.cores} ({worker.coresUsed} Used)</td> - <td>{Utils.memoryMegabytesToString(worker.memory)} - ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td> - </tr> - } - } - </tbody> - </table> - } - - def appTable(apps: Seq[spark.deploy.master.ApplicationInfo]) = { - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>ID</th> - <th>Description</th> - <th>Cores</th> - <th>Memory per Node</th> - <th>Submit Time</th> - <th>User</th> - <th>State</th> - <th>Duration</th> - </tr> - </thead> - <tbody> - { - apps.map{ app => - <tr> - <td> - <a href={"app?appId=" + app.id}>{app.id}</a> - </td> - <td>{app.desc.name}</td> - <td> - {app.coresGranted} - </td> - <td>{Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}</td> - <td>{DeployWebUI.formatDate(app.submitDate)}</td> - <td>{app.desc.user}</td> - <td>{app.state.toString}</td> - <td>{DeployWebUI.formatDuration(app.duration)}</td> - </tr> - } - } - </tbody> - </table> - } -} - -object MasterWebUI { - val STATIC_RESOURCE_DIR = "spark/webui/static" - val DEFAULT_PORT = "8080" -}
\ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala new file mode 100644 index 0000000000..2a810b71d2 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -0,0 +1,100 @@ +package spark.deploy.master.ui + +import akka.dispatch.Await +import akka.pattern.ask +import akka.util.duration._ + +import javax.servlet.http.HttpServletRequest + +import net.liftweb.json.JsonAST.JValue + +import scala.xml.Node + +import spark.deploy.{RequestMasterState, JsonProtocol, MasterState} +import spark.deploy.master.ExecutorInfo +import spark.ui.UIUtils + +class ApplicationPage(parent: MasterWebUI) { + val master = parent.master + implicit val timeout = parent.timeout + + /** Executor details for a particular application */ + def renderJson(request: HttpServletRequest): JValue = { + val appId = request.getParameter("appId") + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val state = Await.result(stateFuture, 3 seconds) + val app = state.activeApps.find(_.id == appId).getOrElse({ + state.completedApps.find(_.id == appId).getOrElse(null) + }) + JsonProtocol.writeApplicationInfo(app) + } + + /** Executor details for a particular application */ + def render(request: HttpServletRequest): Seq[Node] = { + val appId = request.getParameter("appId") + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val state = Await.result(stateFuture, 3 seconds) + val app = state.activeApps.find(_.id == appId).getOrElse({ + state.completedApps.find(_.id == appId).getOrElse(null) + }) + + val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") + val executors = app.executors.values.toSeq + val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors) + + val content = + <hr /> + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>ID:</strong> {app.id}</li> + <li><strong>Description:</strong> {app.desc.name}</li> + <li><strong>User:</strong> {app.desc.user}</li> + <li><strong>Cores:</strong> + { + if (app.desc.maxCores == Integer.MAX_VALUE) { + "Unlimited %s granted".format(app.coresGranted) + } else { + "%s (%s granted, %s left)".format( + app.desc.maxCores, app.coresGranted, app.coresLeft) + } + } + </li> + <li><strong>Memory per Slave:</strong> {app.desc.memoryPerSlave}</li> + <li><strong>Submit Date:</strong> {app.submitDate}</li> + <li><strong>State:</strong> {app.state}</li> + <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li> + </ul> + </div> + </div> + + <hr/> + + <div class="row"> <!-- Executors --> + <div class="span12"> + <h3> Executor Summary </h3> + <br/> + {executorTable} + </div> + </div>; + UIUtils.sparkPage(content, "Application Info: " + app.desc.name) + } + + def executorRow(executor: ExecutorInfo): Seq[Node] = { + <tr> + <td>{executor.id}</td> + <td> + <a href={executor.worker.webUiAddress}>{executor.worker.id}</a> + </td> + <td>{executor.cores}</td> + <td>{executor.memory}</td> + <td>{executor.state}</td> + <td> + <a href={"%s/log?appId=%s&executorId=%s&logType=stdout" + .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a> + <a href={"%s/log?appId=%s&executorId=%s&logType=stderr" + .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a> + </td> + </tr> + } +} diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala new file mode 100644 index 0000000000..f833c59de8 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -0,0 +1,115 @@ +package spark.deploy.master.ui + +import akka.dispatch.Await +import akka.pattern.ask +import akka.util.duration._ + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import spark.deploy.{RequestMasterState, DeployWebUI, MasterState} +import spark.Utils +import spark.ui.UIUtils +import spark.deploy.master.{ApplicationInfo, WorkerInfo} + +class IndexPage(parent: MasterWebUI) { + val master = parent.master + implicit val timeout = parent.timeout + + /** Index view listing applications and executors */ + def render(request: HttpServletRequest): Seq[Node] = { + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val state = Await.result(stateFuture, 3 seconds) + + val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory") + val workers = state.workers.sortBy(_.id) + val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers) + + val appHeaders = Seq("ID", "Description", "Cores", "Memory per Node", "Submit Time", "User", + "State", "Duration") + val activeApps = state.activeApps.sortBy(_.startTime).reverse + val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps) + val completedApps = state.completedApps.sortBy(_.endTime).reverse + val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) + + val content = + <hr /> + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>URL:</strong>{state.uri}</li> + <li><strong>Workers:</strong>{state.workers.size}</li> + <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total, + {state.workers.map(_.coresUsed).sum} Used</li> + <li><strong>Memory:</strong> + {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total, + {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li> + <li><strong>Applications:</strong> + {state.activeApps.size} Running, + {state.completedApps.size} Completed </li> + </ul> + </div> + </div> + + <div class="row"> + <div class="span12"> + <h3> Workers </h3> + <br/> + {workerTable} + </div> + </div> + + <hr/> + + <div class="row"> + <div class="span12"> + <h3> Running Applications </h3> + <br/> + {activeAppsTable} + </div> + </div> + + <hr/> + + <div class="row"> + <div class="span12"> + <h3> Completed Applications </h3> + <br/> + {completedAppsTable} + </div> + </div>; + UIUtils.sparkPage(content, "Spark Master: " + state.uri) + } + + def workerRow(worker: WorkerInfo): Seq[Node] = { + <tr> + <td> + <a href={worker.webUiAddress}>{worker.id}</a> + </td> + <td>{worker.host}:{worker.port}</td> + <td>{worker.state}</td> + <td>{worker.cores} ({worker.coresUsed} Used)</td> + <td>{Utils.memoryMegabytesToString(worker.memory)} + ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td> + </tr> + } + + + def appRow(app: ApplicationInfo): Seq[Node] = { + <tr> + <td> + <a href={"app?appId=" + app.id}>{app.id}</a> + </td> + <td>{app.desc.name}</td> + <td> + {app.coresGranted} + </td> + <td>{Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}</td> + <td>{DeployWebUI.formatDate(app.submitDate)}</td> + <td>{app.desc.user}</td> + <td>{app.state.toString}</td> + <td>{DeployWebUI.formatDuration(app.duration)}</td> + </tr> + } +} diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala new file mode 100644 index 0000000000..5fd17f10c6 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala @@ -0,0 +1,50 @@ +package spark.deploy.master.ui + +import akka.actor.ActorRef +import akka.util.Duration + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import spark.{Logging, Utils} +import spark.ui.JettyUtils +import spark.ui.JettyUtils._ + +/** + * Web UI server for the standalone master. + */ +private[spark] +class MasterWebUI(val master: ActorRef) extends Logging { + implicit val timeout = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val host = Utils.localHostName() + val port = Option(System.getProperty("master.ui.port")) + .getOrElse(MasterWebUI.DEFAULT_PORT).toInt + + val applicationPage = new ApplicationPage(this) + val indexPage = new IndexPage(this) + + def start() { + try { + val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) + logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Master JettyUtils", e) + System.exit(1) + } + } + + val handlers = Array[(String, Handler)]( + ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), + ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)), + ("/app", (request: HttpServletRequest) => applicationPage.render(request)), + ("*", (request: HttpServletRequest) => indexPage.render(request)) + ) +} + +object MasterWebUI { + val STATIC_RESOURCE_DIR = "spark/webui/static" + val DEFAULT_PORT = "8080" +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 3878fe3f7b..690bdfe128 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -14,6 +14,7 @@ import spark.deploy.LaunchExecutor import spark.deploy.RegisterWorkerFailed import spark.deploy.master.Master import java.io.File +import ui.WorkerWebUI private[spark] class Worker( host: String, diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala deleted file mode 100644 index 805e7b52db..0000000000 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ /dev/null @@ -1,148 +0,0 @@ -package spark.deploy.worker - -import akka.actor.ActorRef -import akka.dispatch.Await -import akka.pattern.ask -import akka.util.{Duration, Timeout} -import akka.util.duration._ -import java.io.File -import javax.servlet.http.HttpServletRequest -import net.liftweb.json.JsonAST.JValue -import org.eclipse.jetty.server.Handler -import scala.io.Source -import spark.{Utils, Logging} -import spark.deploy.{JsonProtocol, WorkerState, RequestWorkerState} -import spark.ui.{JettyUtils => UtilsWebUI} -import spark.ui.JettyUtils._ -import xml.Node - -/** - * Web UI server for the standalone worker. - */ -private[spark] -class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging { - implicit val timeout = Timeout( - Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) - val host = Utils.localHostName() - val port = Option(System.getProperty("wroker.ui.port")) - .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt - - val handlers = Array[(String, Handler)]( - ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), - ("/log", (request: HttpServletRequest) => log(request)), - ("/json", (request: HttpServletRequest) => indexJson), - ("*", (request: HttpServletRequest) => index) - ) - - def start() { - try { - val (server, boundPort) = UtilsWebUI.startJettyServer("0.0.0.0", port, handlers) - logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Worker JettyUtils", e) - System.exit(1) - } - } - - def indexJson(): JValue = { - val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] - val workerState = Await.result(stateFuture, 3 seconds) - JsonProtocol.writeWorkerState(workerState) - } - - def index(): Seq[Node] = { - val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] - val workerState = Await.result(stateFuture, 3 seconds) - val content = - <hr /> - <div class="row"> <!-- Worker Details --> - <div class="span12"> - <ul class="unstyled"> - <li><strong>ID:</strong> {workerState.workerId}</li> - <li><strong> - Master URL:</strong> {workerState.masterUrl} - </li> - <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li> - <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)} - ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li> - </ul> - <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p> - </div> - </div> - <hr/> - - <div class="row"> <!-- Running Executors --> - <div class="span12"> - <h3> Running Executors {workerState.executors.size} </h3> - <br/> - {executorTable(workerState.executors)} - </div> - </div> - <hr/> - - <div class="row"> <!-- Finished Executors --> - <div class="span12"> - <h3> Finished Executors </h3> - <br/> - {executorTable(workerState.finishedExecutors)} - </div> - </div>; - - UtilsWebUI.sparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port)) - } - - def executorTable(executors: Seq[ExecutorRunner]): Seq[Node] = { - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <tr> - <th>ExecutorID</th> - <th>Cores</th> - <th>Memory</th> - <th>Job Details</th> - <th>Logs</th> - </tr> - </thead> - <tbody> - {executors.map(executorRow)} - </tbody> - </table> - } - - def executorRow(executor: ExecutorRunner): Seq[Node] = { - <tr> - <td>{executor.execId}</td> - <td>{executor.cores}</td> - <td>{Utils.memoryMegabytesToString(executor.memory)}</td> - <td> - <ul class="unstyled"> - <li><strong>ID:</strong> {executor.appId}</li> - <li><strong>Name:</strong> {executor.appDesc.name}</li> - <li><strong>User:</strong> {executor.appDesc.user}</li> - </ul> - </td> - <td> - <a href={"log?appId=%s&executorId=%s&logType=stdout" - .format(executor.appId, executor.execId)}>stdout</a> - <a href={"log?appId=%s&executorId=%s&logType=stderr" - .format(executor.appId, executor.execId)}>stderr</a> - </td> - </tr> - } - - def log(request: HttpServletRequest): String = { - val appId = request.getParameter("appId") - val executorId = request.getParameter("executorId") - val logType = request.getParameter("logType") - val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) - val source = Source.fromFile(path) - val lines = source.mkString - source.close() - lines - } -} - -object WorkerWebUI { - val STATIC_RESOURCE_DIR = "spark/webui/static" - val DEFAULT_PORT="8081" -} diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala new file mode 100644 index 0000000000..a6aba59e9f --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala @@ -0,0 +1,97 @@ +package spark.deploy.worker.ui + +import akka.dispatch.Await +import akka.pattern.ask +import akka.util.duration._ + +import javax.servlet.http.HttpServletRequest + +import net.liftweb.json.JsonAST.JValue + +import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState} +import spark.deploy.worker.ExecutorRunner +import spark.Utils +import spark.ui.UIUtils + +import xml.Node + +class IndexPage(parent: WorkerWebUI) { + val worker = parent.worker + val timeout = parent.timeout + + def renderJson(request: HttpServletRequest): JValue = { + val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val workerState = Await.result(stateFuture, 3 seconds) + JsonProtocol.writeWorkerState(workerState) + } + + def render(request: HttpServletRequest): Seq[Node] = { + val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val workerState = Await.result(stateFuture, 3 seconds) + + val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") + val runningExecutorTable = + UIUtils.listingTable(executorHeaders, executorRow, workerState.executors) + val finishedExecutorTable = + UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) + + val content = + <hr /> + <div class="row"> <!-- Worker Details --> + <div class="span12"> + <ul class="unstyled"> + <li><strong>ID:</strong> {workerState.workerId}</li> + <li><strong> + Master URL:</strong> {workerState.masterUrl} + </li> + <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li> + <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)} + ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li> + </ul> + <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p> + </div> + </div> + <hr/> + + <div class="row"> <!-- Running Executors --> + <div class="span12"> + <h3> Running Executors {workerState.executors.size} </h3> + <br/> + {runningExecutorTable} + </div> + </div> + <hr/> + + <div class="row"> <!-- Finished Executors --> + <div class="span12"> + <h3> Finished Executors </h3> + <br/> + {finishedExecutorTable} + </div> + </div>; + + UIUtils.sparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port)) + } + + def executorRow(executor: ExecutorRunner): Seq[Node] = { + <tr> + <td>{executor.execId}</td> + <td>{executor.cores}</td> + <td>{Utils.memoryMegabytesToString(executor.memory)}</td> + <td> + <ul class="unstyled"> + <li><strong>ID:</strong> {executor.appId}</li> + <li><strong>Name:</strong> {executor.appDesc.name}</li> + <li><strong>User:</strong> {executor.appDesc.user}</li> + </ul> + </td> + <td> + <a href={"log?appId=%s&executorId=%s&logType=stdout" + .format(executor.appId, executor.execId)}>stdout</a> + <a href={"log?appId=%s&executorId=%s&logType=stderr" + .format(executor.appId, executor.execId)}>stderr</a> + </td> + </tr> + } + +} diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala new file mode 100644 index 0000000000..abfc847527 --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -0,0 +1,64 @@ +package spark.deploy.worker.ui + +import akka.actor.ActorRef +import akka.util.{Duration, Timeout} + +import java.io.File + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import scala.io.Source + +import spark.{Utils, Logging} +import spark.ui.JettyUtils +import spark.ui.JettyUtils._ + +/** + * Web UI server for the standalone worker. + */ +private[spark] +class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging { + implicit val timeout = Timeout( + Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) + val host = Utils.localHostName() + val port = Option(System.getProperty("wroker.ui.port")) + .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt + + val indexPage = new IndexPage(this) + + val handlers = Array[(String, Handler)]( + ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), + ("/log", (request: HttpServletRequest) => log(request)), + ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)), + ("*", (request: HttpServletRequest) => indexPage.render(request)) + ) + + def start() { + try { + val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) + logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) + } catch { + case e: Exception => + logError("Failed to create Worker JettyUtils", e) + System.exit(1) + } + } + + def log(request: HttpServletRequest): String = { + val appId = request.getParameter("appId") + val executorId = request.getParameter("executorId") + val logType = request.getParameter("logType") + val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) + val source = Source.fromFile(path) + val lines = source.mkString + source.close() + lines + } +} + +object WorkerWebUI { + val STATIC_RESOURCE_DIR = "spark/webui/static" + val DEFAULT_PORT="8081" +} diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala index 8bb343163a..93e7129fc0 100644 --- a/core/src/main/scala/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/spark/ui/JettyUtils.scala @@ -15,7 +15,6 @@ import spark.Logging import xml.Node - /** Utilities for launching a web server using Jetty's HTTP Server class */ private[spark] object JettyUtils extends Logging { // Base type for a function that returns something based on an HTTP request. Allows for |