aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala14
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala6
-rw-r--r--core/src/main/scala/spark/ui/JettyUI.scala (renamed from core/src/main/scala/spark/ui/WebUI.scala)44
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala33
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala22
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala47
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala19
-rw-r--r--core/src/main/scala/spark/ui/storage/BlockManagerUI.scala9
-rw-r--r--core/src/main/scala/spark/ui/storage/IndexPage.scala8
-rw-r--r--core/src/main/scala/spark/ui/storage/RDDPage.scala12
10 files changed, 108 insertions, 106 deletions
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 328a7cb297..1ec5f77bd7 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -10,11 +10,11 @@ import net.liftweb.json.JsonAST.JValue
import org.eclipse.jetty.server.Handler
import scala.xml.Node
import spark.{Logging, Utils}
-import spark.ui.WebUI
-import WebUI._
+import spark.ui.JettyUI
+import JettyUI._
import spark.deploy._
import spark.deploy.MasterState
-import spark.ui.WebUI
+import spark.ui.JettyUI
/**
* Web UI server for the standalone master.
@@ -29,11 +29,11 @@ class MasterWebUI(master: ActorRef) extends Logging {
def start() {
try {
- val (server, boundPort) = WebUI.startJettyServer("0.0.0.0", port, handlers)
+ val (server, boundPort) = JettyUI.startJettyServer("0.0.0.0", port, handlers)
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
- logError("Failed to create Master WebUI", e)
+ logError("Failed to create Master JettyUI", e)
System.exit(1)
}
}
@@ -99,7 +99,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
{executorTable(app.executors.values.toList)}
</div>
</div>;
- WebUI.sparkPage(content, "Application Info: " + app.desc.name)
+ JettyUI.sparkPage(content, "Application Info: " + app.desc.name)
}
def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
@@ -189,7 +189,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
{appTable(state.completedApps.sortBy(_.endTime).reverse)}
</div>
</div>;
- WebUI.sparkPage(content, "Spark Master: " + state.uri)
+ JettyUI.sparkPage(content, "Spark Master: " + state.uri)
}
def workerTable(workers: Seq[spark.deploy.master.WorkerInfo]) = {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f661d99815..0febb9364c 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -12,8 +12,8 @@ import org.eclipse.jetty.server.Handler
import scala.io.Source
import spark.{Utils, Logging}
import spark.deploy.{JsonProtocol, WorkerState, RequestWorkerState}
-import spark.ui.{WebUI => UtilsWebUI}
-import spark.ui.WebUI._
+import spark.ui.{JettyUI => UtilsWebUI}
+import spark.ui.JettyUI._
import xml.Node
/**
@@ -40,7 +40,7 @@ class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
- logError("Failed to create Worker WebUI", e)
+ logError("Failed to create Worker JettyUI", e)
System.exit(1)
}
}
diff --git a/core/src/main/scala/spark/ui/WebUI.scala b/core/src/main/scala/spark/ui/JettyUI.scala
index 6dedd28fc1..c3f01073d5 100644
--- a/core/src/main/scala/spark/ui/WebUI.scala
+++ b/core/src/main/scala/spark/ui/JettyUI.scala
@@ -1,26 +1,26 @@
package spark.ui
import annotation.tailrec
+
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
-import net.liftweb.json._
+
+import net.liftweb.json.{JValue, pretty, render}
+
import org.eclipse.jetty.server.{Server, Request, Handler}
import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler}
-import spark.Logging
-import util.{Try, Success, Failure}
-import xml.Node
-abstract class UIComponent {
- def getHandlers(): Seq[(String, Handler)]
-}
+import scala.util.{Try, Success, Failure}
-abstract class View[T] {
- def render(request: HttpServletRequest): T
-}
+import spark.Logging
+
+import xml.Node
-object WebUI extends Logging {
- // CORE WEB UI COMPONENTS
+private[spark] object JettyUI extends Logging {
+ // Base type for a function that returns something based on an HTTP request. Allows for
+ // implicit conversion from many types of functions to jetty Handlers.
type Responder[T] = HttpServletRequest => T
+ // Conversions from various types of Responder's to jetty Handlers
implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
@@ -30,7 +30,7 @@ object WebUI extends Logging {
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
- def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
+ private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
extractFn: T => String = (in: Any) => in.toString): Handler = {
new AbstractHandler {
def handle(target: String,
@@ -46,6 +46,7 @@ object WebUI extends Logging {
}
}
+ /** Creates a handler for serving files from a static directory. */
def createStaticHandler(resourceBase: String): ResourceHandler = {
val staticHandler = new ResourceHandler
Option(getClass.getClassLoader.getResource(resourceBase)) match {
@@ -57,6 +58,12 @@ object WebUI extends Logging {
staticHandler
}
+ /**
+ * Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers.
+ *
+ * If the desired port number is contented, continues incrementing ports until a free port is
+ * found. Returns the chosen port and the jetty Server object.
+ */
def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
val handlersToRegister = handlers.map { case(path, handler) =>
if (path == "*") {
@@ -84,18 +91,19 @@ object WebUI extends Logging {
}
connect(port)
}
+}
- // HELPER FUNCTIONS AND SHORTCUTS
+/** Utility functions for generating XML pages with spark content. */
+object UIUtils {
- /** Page with Spark logo, title, and Spark UI headers */
+ /** Returns a page containing the supplied content and the spark web ui headers */
def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = {
val newContent =
<h2><a href="/storage">Storage</a> | <a href="/stages">Jobs</a> </h2><hl/>;
-
sparkPage(newContent ++ content, title)
}
- /** Page with Spark logo and title */
+ /** Returns a page containing the supplied content and the spark css, js, and logo. */
def sparkPage(content: => Seq[Node], title: String): Seq[Node] = {
<html>
<head>
@@ -125,7 +133,7 @@ object WebUI extends Logging {
</html>
}
- /** Shortcut for making a table derived from a sequence of objects. */
+ /** Returns an HTML table constructed by generating a row for each object in a sequence. */
def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>{headers.map(h => <th>{h}</th>)}</thead>
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index a1f6cc60ec..dd7d33e0fa 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -1,43 +1,46 @@
package spark.ui
-import jobs.JobProgressUI
-import spark.{Logging, SparkContext, Utils}
import javax.servlet.http.HttpServletRequest
+
import org.eclipse.jetty.server.Handler
-import storage.BlockManagerUI
-import WebUI._
+import spark.{Logging, SparkContext, Utils}
+import spark.ui.storage.BlockManagerUI
+import spark.ui.jobs.JobProgressUI
+import spark.ui.UIUtils._
+import spark.ui.JettyUI._
+
+/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext) extends Logging {
val host = Utils.localHostName()
val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
-
+ var boundPort: Option[Int] = None
val handlers = Seq[(String, Handler)](
("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
- ("*", (request: HttpServletRequest) => WebUI.headerSparkPage(<h1>Test</h1>, "Test page"))
+ ("*", (request: HttpServletRequest) => headerSparkPage(<h1>Test</h1>, "Test page"))
)
- val components = Seq(new BlockManagerUI(sc), new JobProgressUI(sc))
+ val storage = new BlockManagerUI(sc)
+ val jobs = new JobProgressUI(sc)
+ val allHandlers = handlers ++ storage.getHandlers ++ jobs.getHandlers
def start() {
/** Start an HTTP server to run the Web interface */
try {
- val allHandlers = components.flatMap(_.getHandlers) ++ handlers
- val (server, boundPort) = WebUI.startJettyServer("0.0.0.0", port, allHandlers)
- logInfo("Started Spark Web UI at http://%s:%d".format(host, boundPort))
+ val (server, usedPort) = JettyUI.startJettyServer("0.0.0.0", port, allHandlers)
+ logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
+ boundPort = Some(usedPort)
} catch {
case e: Exception =>
- logError("Failed to create Spark WebUI", e)
+ logError("Failed to create Spark JettyUI", e)
System.exit(1)
}
}
- private[spark] def appUIAddress = "http://" + host + ":" + port
+ private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
}
object SparkUI {
val DEFAULT_PORT = "33000"
val STATIC_RESOURCE_DIR = "spark/webui/static"
}
-
-
-
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 1740524f49..811baae811 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -1,15 +1,20 @@
package spark.ui.jobs
-import spark.ui.{WebUI, View}
-import xml.{NodeSeq, Node}
-import spark.ui.WebUI._
-import scala.Some
import akka.util.Duration
-import spark.scheduler.Stage
+
import java.util.Date
+
import javax.servlet.http.HttpServletRequest
-class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] {
+import scala.Some
+
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+
+import xml.{NodeSeq, Node}
+
+/** Page showing list of all ongoing and recently finished stages */
+class IndexPage(parent: JobProgressUI) {
val listener = parent.listener
val dateFmt = parent.dateFmt
@@ -24,7 +29,7 @@ class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] {
val content = <h2>Active Stages</h2> ++ activeStageTable ++
<h2>Completed Stages</h2> ++ completedStageTable
- WebUI.headerSparkPage(content, "Spark Stages")
+ headerSparkPage(content, "Spark Stages")
}
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
@@ -43,7 +48,8 @@ class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] {
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>
<td>{s.origin}</td>
<td>{submissionTime}</td>
- <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+ <td>{getElapsedTime(s.submissionTime,
+ s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 9ee962bf2e..31aa0f9f0d 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -1,45 +1,30 @@
package spark.ui.jobs
-import spark.{Utils, SparkContext}
-import spark.scheduler._
-import spark.scheduler.SparkListenerTaskEnd
-import spark.scheduler.StageCompleted
-import spark.scheduler.SparkListenerStageSubmitted
-import org.eclipse.jetty.server.Handler
-import javax.servlet.http.HttpServletRequest
-import xml.Node
-import collection.mutable._
-import spark.Success
-import akka.util.Duration
import java.text.SimpleDateFormat
-import java.util.Date
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import scala.Seq
+import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
+
+import spark.ui.JettyUI._
+import spark.SparkContext
+import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
-import collection.mutable
-import org.hsqldb.lib.HashMappedList
import spark.executor.TaskMetrics
-import spark.scheduler.SparkListenerTaskEnd
-import scala.Some
-import spark.scheduler.SparkListenerStageSubmitted
-import scala.Seq
-import spark.scheduler.StageCompleted
-import spark.scheduler.SparkListenerJobStart
-import spark.ui.{WebUI, UIComponent}
-import spark.ui.WebUI._
-import spark.scheduler.SparkListenerTaskEnd
-import scala.Some
-import spark.scheduler.SparkListenerStageSubmitted
-import spark.scheduler.StageCompleted
-import spark.scheduler.SparkListenerJobStart
+import spark.Success
-private[spark]
-class JobProgressUI(sc: SparkContext) extends UIComponent {
+/** Web UI showing progress status of all jobs in the given SparkContext. */
+private[spark] class JobProgressUI(sc: SparkContext) {
val listener = new JobProgressListener
val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss")
sc.addSparkListener(listener)
- val indexPage = new IndexPage(this)
- val stagePage = new StagePage(this)
+ private val indexPage = new IndexPage(this)
+ private val stagePage = new StagePage(this)
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 64b8c8418c..74ac811cef 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -1,17 +1,20 @@
package spark.ui.jobs
+import java.util.Date
+
import javax.servlet.http.HttpServletRequest
-import xml.Node
-import spark.ui.WebUI._
-import spark.ui.WebUI
-import spark.ui.View
+
+import scala.collection.mutable.ListBuffer
+
+import spark.ui.UIUtils._
import spark.util.Distribution
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
-import java.util.Date
-import scala.collection.mutable.ListBuffer
-class StagePage(parent: JobProgressUI) extends View[Seq[Node]] {
+import xml.Node
+
+/** Page showing statistics and task list for a given stage */
+class StagePage(parent: JobProgressUI) {
val listener = parent.listener
val dateFmt = parent.dateFmt
@@ -52,7 +55,7 @@ class StagePage(parent: JobProgressUI) extends View[Seq[Node]] {
val content =
<h2>Summary Metrics</h2> ++ quantileTable ++ <h2>Tasks</h2> ++ taskTable;
- WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId))
+ headerSparkPage(content, "Stage Details: %s".format(stageId))
}
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
index 21555384dd..d83c826033 100644
--- a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala
@@ -4,15 +4,12 @@ import akka.util.Duration
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
import spark.{Logging, SparkContext}
-import spark.ui.WebUI._
+import spark.ui.JettyUI._
import spark.ui.{UIComponent}
-/**
- * Web UI server for the BlockManager inside each SparkContext.
- */
+/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[spark]
-class BlockManagerUI(val sc: SparkContext)
- extends UIComponent with Logging {
+class BlockManagerUI(val sc: SparkContext) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
index 2f4857d48a..5ead772bc0 100644
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -2,12 +2,12 @@ package spark.ui.storage
import xml.Node
import spark.storage.{RDDInfo, StorageUtils}
-import spark.ui.WebUI._
import spark.Utils
-import spark.ui.{View, WebUI}
+import spark.ui.UIUtils._
import javax.servlet.http.HttpServletRequest
-class IndexPage(parent: BlockManagerUI) extends View[Seq[Node]] {
+/** Page showing list of RDD's currently stored in the cluster */
+class IndexPage(parent: BlockManagerUI) {
val sc = parent.sc
def render(request: HttpServletRequest): Seq[Node] = {
@@ -40,7 +40,7 @@ class IndexPage(parent: BlockManagerUI) extends View[Seq[Node]] {
</div>
</div> ++ {rddTable};
- WebUI.headerSparkPage(content, "Spark Storage ")
+ headerSparkPage(content, "Spark Storage ")
}
def rddRow(rdd: RDDInfo): Seq[Node] = {
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
index 957c400080..7628fde4aa 100644
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -1,16 +1,16 @@
package spark.ui.storage
import javax.servlet.http.HttpServletRequest
-import xml.Node
+
import spark.storage.{StorageStatus, StorageUtils}
-import spark.ui.WebUI._
+import spark.ui.UIUtils._
import spark.Utils
-import spark.ui.WebUI
-import spark.ui.View
import spark.storage.BlockManagerMasterActor.BlockStatus
+import xml.Node
-class RDDPage(parent: BlockManagerUI) extends View[Seq[Node]] {
+/** Page showing storage details for a given RDD */
+class RDDPage(parent: BlockManagerUI) {
val sc = parent.sc
def render(request: HttpServletRequest): Seq[Node] = {
@@ -65,7 +65,7 @@ class RDDPage(parent: BlockManagerUI) extends View[Seq[Node]] {
</div>
<hr/> ++ {workerTable};
- WebUI.headerSparkPage(content, "RDD Info: " + id)
+ headerSparkPage(content, "RDD Info: " + id)
}
def blockRow(blk: (String, BlockStatus)): Seq[Node] = {