aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala16
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala135
-rw-r--r--core/src/main/scala/spark/util/WebUI.scala31
4 files changed, 162 insertions, 32 deletions
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 34f50fd5e4..a2e9dfd762 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -20,10 +20,10 @@ import javax.servlet.http.HttpServletRequest
*/
private[spark]
class MasterWebUI(master: ActorRef) extends Logging {
-
- implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ implicit val timeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName()
- val port = Option(System.getProperty("spark.ui.port"))
+ val port = Option(System.getProperty("master.ui.port"))
.getOrElse(MasterWebUI.DEFAULT_PORT).toInt
def start() {
@@ -82,13 +82,13 @@ class MasterWebUI(master: ActorRef) extends Logging {
<div class="span12">
<h3> Executor Summary </h3>
<br/>
- {executorsTable(app.executors.values.toList)}
+ {executorTable(app.executors.values.toList)}
</div>
</div>;
UtilsWebUI.makePage(content, "Application Info: " + app.desc.name)
}
- def executorsTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
+ def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed">
<thead>
<tr>
@@ -119,7 +119,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
<a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
<a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
- .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+ .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
}
@@ -135,7 +135,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
<ul class="unstyled">
<li><strong>URL:</strong>{state.uri}</li>
<li><strong>Workers:</strong>{state.workers.size}</li>
- <li><strong>Cores:</strong> {state.workers.map(_.cores).sum}Total,
+ <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
{state.workers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory:</strong>
{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
@@ -247,5 +247,5 @@ class MasterWebUI(master: ActorRef) extends Logging {
object MasterWebUI {
val STATIC_RESOURCE_DIR = "spark/deploy/static"
- val DEFAULT_PORT = "34000"
+ val DEFAULT_PORT = "8080"
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index f8fdab927e..3878fe3f7b 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -88,16 +88,8 @@ private[spark] class Worker(
}
def startWebUi() {
- val webUi = new WorkerWebUI(context.system, self, workDir)
- /*
- try {
- AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
- } catch {
- case e: Exception =>
- logError("Failed to create web UI", e)
- System.exit(1)
- }
- */
+ val webUi = new WorkerWebUI(self, workDir)
+ webUi.start()
}
override def receive = {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index 3235c50d1b..b8b4b89738 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -8,22 +8,137 @@ import akka.util.duration._
import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
import cc.spray.http.MediaTypes
-import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
import java.io.File
+import spark.util.{WebUI => UtilsWebUI}
+import spark.{Utils, Logging}
+import org.eclipse.jetty.server.Handler
+import spark.util.WebUI._
+import spark.deploy.WorkerState
+import scala.io.Source
+import javax.servlet.http.HttpServletRequest
+import xml.Node
/**
* Web UI server for the standalone worker.
*/
private[spark]
-class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives {
- val RESOURCE_DIR = "spark/deploy/worker/webui"
- val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
- implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
-
+class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
+ implicit val timeout = Timeout(
+ Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+ val host = Utils.localHostName()
+ val port = Option(System.getProperty("wroker.ui.port"))
+ .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
+
+ val handlers = Array[(String, Handler)](
+ ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
+ ("/log", (request: HttpServletRequest) => log(request)),
+ ("*", (request: HttpServletRequest) => index)
+ )
+
+ def start() {
+ try {
+ val (server, boundPort) = UtilsWebUI.startJettyServer("0.0.0.0", port, handlers)
+ logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Worker WebUI", e)
+ System.exit(1)
+ }
+ }
+
+ def index(): Seq[Node] = {
+ val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val workerState = Await.result(stateFuture, 3 seconds)
+ val content =
+ <div class="row"> <!-- Worker Details -->
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {workerState.workerId}</li>
+ <li><strong>
+ Master URL:</strong> {workerState.masterUrl}
+ </li>
+ <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
+ <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
+ ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
+ </ul>
+ <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
+ </div>
+ </div>
+ <hr/>
+
+ <div class="row"> <!-- Running Executors -->
+ <div class="span12">
+ <h3> Running Executors {workerState.executors.size} </h3>
+ <br/>
+ {executorTable(workerState.executors)}
+ </div>
+ </div>
+ <hr/>
+
+ <div class="row"> <!-- Finished Executors -->
+ <div class="span12">
+ <h3> Finished Executors </h3>
+ <br/>
+ {executorTable(workerState.finishedExecutors)}
+ </div>
+ </div>;
+
+ UtilsWebUI.makePage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
+ }
+
+ def executorTable(executors: Seq[ExecutorRunner]): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>ExecutorID</th>
+ <th>Cores</th>
+ <th>Memory</th>
+ <th>Job Details</th>
+ <th>Logs</th>
+ </tr>
+ </thead>
+ <tbody>
+ {executors.map(executorRow)}
+ </tbody>
+ </table>
+ }
+
+ def executorRow(executor: ExecutorRunner): Seq[Node] = {
+ <tr>
+ <td>{executor.execId}</td>
+ <td>{executor.cores}</td>
+ <td>{Utils.memoryMegabytesToString(executor.memory)}</td>
+ <td>
+ <ul class="unstyled">
+ <li><strong>ID:</strong> {executor.appId}</li>
+ <li><strong>Name:</strong> {executor.appDesc.name}</li>
+ <li><strong>User:</strong> {executor.appDesc.user}</li>
+ </ul>
+ </td>
+ <td>
+ <a href={"log?appId=%s&executorId=%s&logType=stdout"
+ .format(executor.appId, executor.execId)}>stdout</a>
+ <a href={"log?appId=%s&executorId=%s&logType=stderr"
+ .format(executor.appId, executor.execId)}>stderr</a>
+ </td>
+ </tr>
+ }
+
+ def log(request: HttpServletRequest): String = {
+ val appId = request.getParameter("appId")
+ val executorId = request.getParameter("executorId")
+ val logType = request.getParameter("logType")
+ val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+ val source = Source.fromFile(path)
+ val lines = source.mkString
+ source.close()
+ lines
+ }
+
+ /*
val handler = {
get {
(path("") & parameters('format ?)) {
@@ -54,4 +169,10 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File)
getFromResourceDirectory(RESOURCE_DIR)
}
}
+ */
+}
+
+object WorkerWebUI {
+ val STATIC_RESOURCE_DIR = "spark/deploy/static"
+ val DEFAULT_PORT="8081"
}
diff --git a/core/src/main/scala/spark/util/WebUI.scala b/core/src/main/scala/spark/util/WebUI.scala
index eaacd95691..34b776f1d8 100644
--- a/core/src/main/scala/spark/util/WebUI.scala
+++ b/core/src/main/scala/spark/util/WebUI.scala
@@ -16,13 +16,14 @@ import annotation.tailrec
object WebUI extends Logging {
type Responder[T] = HttpServletRequest => T
- implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler = {
+ implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler =
createHandler(responder, "text/json")
- }
- implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = {
+ implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html")
- }
+
+ implicit def textResponderToHandler(responder: Responder[String]): Handler =
+ createHandler(responder, "text/plain")
def createHandler[T <% AnyRef](responder: Responder[T], contentType: String): Handler = {
new AbstractHandler {
@@ -40,12 +41,28 @@ object WebUI extends Logging {
}
}
+ /** Create and return a staticHandler if resourceBase can be located */
def createStaticHandler(resourceBase: String): ResourceHandler = {
val staticHandler = new ResourceHandler
- val resource = getClass.getClassLoader.getResource(resourceBase)
- staticHandler.setResourceBase(resource.toString)
- staticHandler
+ Option(getClass.getClassLoader.getResource(resourceBase)) match {
+ case Some(res) =>
+ staticHandler.setResourceBase (res.toString)
+ staticHandler
+ }
+ }
+
+ /*
+ /** Create and return a staticHandler if resourceBase can be located */
+ def createStaticHandler(resourceBase: String): Option[ResourceHandler] = {
+ val staticHandler = new ResourceHandler
+ Option(getClass.getClassLoader.getResource(resourceBase)) match {
+ case Some(res) =>
+ staticHandler.setResourceBase (res.toString)
+ Some(staticHandler)
+ case None => None
+ }
}
+ */
def startJettyServer(ip: String, port: Int, handlers: Array[(String, Handler)]): (Server, Int) = {
val handlersToRegister = handlers.map { case(path, handler) =>