diff options
-rw-r--r-- | core/src/main/scala/spark/deploy/master/MasterWebUI.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/JettyUI.scala (renamed from core/src/main/scala/spark/ui/WebUI.scala) | 44 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/SparkUI.scala | 33 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/IndexPage.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressUI.scala | 47 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 19 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/storage/BlockManagerUI.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/storage/IndexPage.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/storage/RDDPage.scala | 12 |
10 files changed, 108 insertions, 106 deletions
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 328a7cb297..1ec5f77bd7 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -10,11 +10,11 @@ import net.liftweb.json.JsonAST.JValue import org.eclipse.jetty.server.Handler import scala.xml.Node import spark.{Logging, Utils} -import spark.ui.WebUI -import WebUI._ +import spark.ui.JettyUI +import JettyUI._ import spark.deploy._ import spark.deploy.MasterState -import spark.ui.WebUI +import spark.ui.JettyUI /** * Web UI server for the standalone master. @@ -29,11 +29,11 @@ class MasterWebUI(master: ActorRef) extends Logging { def start() { try { - val (server, boundPort) = WebUI.startJettyServer("0.0.0.0", port, handlers) + val (server, boundPort) = JettyUI.startJettyServer("0.0.0.0", port, handlers) logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Master WebUI", e) + logError("Failed to create Master JettyUI", e) System.exit(1) } } @@ -99,7 +99,7 @@ class MasterWebUI(master: ActorRef) extends Logging { {executorTable(app.executors.values.toList)} </div> </div>; - WebUI.sparkPage(content, "Application Info: " + app.desc.name) + JettyUI.sparkPage(content, "Application Info: " + app.desc.name) } def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = { @@ -189,7 +189,7 @@ class MasterWebUI(master: ActorRef) extends Logging { {appTable(state.completedApps.sortBy(_.endTime).reverse)} </div> </div>; - WebUI.sparkPage(content, "Spark Master: " + state.uri) + JettyUI.sparkPage(content, "Spark Master: " + state.uri) } def workerTable(workers: Seq[spark.deploy.master.WorkerInfo]) = { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index f661d99815..0febb9364c 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -12,8 +12,8 @@ import org.eclipse.jetty.server.Handler import scala.io.Source import spark.{Utils, Logging} import spark.deploy.{JsonProtocol, WorkerState, RequestWorkerState} -import spark.ui.{WebUI => UtilsWebUI} -import spark.ui.WebUI._ +import spark.ui.{JettyUI => UtilsWebUI} +import spark.ui.JettyUI._ import xml.Node /** @@ -40,7 +40,7 @@ class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging { logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Worker WebUI", e) + logError("Failed to create Worker JettyUI", e) System.exit(1) } } diff --git a/core/src/main/scala/spark/ui/WebUI.scala b/core/src/main/scala/spark/ui/JettyUI.scala index 6dedd28fc1..c3f01073d5 100644 --- a/core/src/main/scala/spark/ui/WebUI.scala +++ b/core/src/main/scala/spark/ui/JettyUI.scala @@ -1,26 +1,26 @@ package spark.ui import annotation.tailrec + import javax.servlet.http.{HttpServletResponse, HttpServletRequest} -import net.liftweb.json._ + +import net.liftweb.json.{JValue, pretty, render} + import org.eclipse.jetty.server.{Server, Request, Handler} import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} -import spark.Logging -import util.{Try, Success, Failure} -import xml.Node -abstract class UIComponent { - def getHandlers(): Seq[(String, Handler)] -} +import scala.util.{Try, Success, Failure} -abstract class View[T] { - def render(request: HttpServletRequest): T -} +import spark.Logging + +import xml.Node -object WebUI extends Logging { - // CORE WEB UI COMPONENTS +private[spark] object JettyUI extends Logging { + // Base type for a function that returns something based on an HTTP request. Allows for + // implicit conversion from many types of functions to jetty Handlers. type Responder[T] = HttpServletRequest => T + // Conversions from various types of Responder's to jetty Handlers implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler = createHandler(responder, "text/json", (in: JValue) => pretty(render(in))) @@ -30,7 +30,7 @@ object WebUI extends Logging { implicit def textResponderToHandler(responder: Responder[String]): Handler = createHandler(responder, "text/plain") - def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, + private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, extractFn: T => String = (in: Any) => in.toString): Handler = { new AbstractHandler { def handle(target: String, @@ -46,6 +46,7 @@ object WebUI extends Logging { } } + /** Creates a handler for serving files from a static directory. */ def createStaticHandler(resourceBase: String): ResourceHandler = { val staticHandler = new ResourceHandler Option(getClass.getClassLoader.getResource(resourceBase)) match { @@ -57,6 +58,12 @@ object WebUI extends Logging { staticHandler } + /** + * Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers. + * + * If the desired port number is contented, continues incrementing ports until a free port is + * found. Returns the chosen port and the jetty Server object. + */ def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = { val handlersToRegister = handlers.map { case(path, handler) => if (path == "*") { @@ -84,18 +91,19 @@ object WebUI extends Logging { } connect(port) } +} - // HELPER FUNCTIONS AND SHORTCUTS +/** Utility functions for generating XML pages with spark content. */ +object UIUtils { - /** Page with Spark logo, title, and Spark UI headers */ + /** Returns a page containing the supplied content and the spark web ui headers */ def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = { val newContent = <h2><a href="/storage">Storage</a> | <a href="/stages">Jobs</a> </h2><hl/>; - sparkPage(newContent ++ content, title) } - /** Page with Spark logo and title */ + /** Returns a page containing the supplied content and the spark css, js, and logo. */ def sparkPage(content: => Seq[Node], title: String): Seq[Node] = { <html> <head> @@ -125,7 +133,7 @@ object WebUI extends Logging { </html> } - /** Shortcut for making a table derived from a sequence of objects. */ + /** Returns an HTML table constructed by generating a row for each object in a sequence. */ def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { <table class="table table-bordered table-striped table-condensed sortable"> <thead>{headers.map(h => <th>{h}</th>)}</thead> diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index a1f6cc60ec..dd7d33e0fa 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -1,43 +1,46 @@ package spark.ui -import jobs.JobProgressUI -import spark.{Logging, SparkContext, Utils} import javax.servlet.http.HttpServletRequest + import org.eclipse.jetty.server.Handler -import storage.BlockManagerUI -import WebUI._ +import spark.{Logging, SparkContext, Utils} +import spark.ui.storage.BlockManagerUI +import spark.ui.jobs.JobProgressUI +import spark.ui.UIUtils._ +import spark.ui.JettyUI._ + +/** Top level user interface for Spark */ private[spark] class SparkUI(sc: SparkContext) extends Logging { val host = Utils.localHostName() val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt - + var boundPort: Option[Int] = None val handlers = Seq[(String, Handler)]( ("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)), - ("*", (request: HttpServletRequest) => WebUI.headerSparkPage(<h1>Test</h1>, "Test page")) + ("*", (request: HttpServletRequest) => headerSparkPage(<h1>Test</h1>, "Test page")) ) - val components = Seq(new BlockManagerUI(sc), new JobProgressUI(sc)) + val storage = new BlockManagerUI(sc) + val jobs = new JobProgressUI(sc) + val allHandlers = handlers ++ storage.getHandlers ++ jobs.getHandlers def start() { /** Start an HTTP server to run the Web interface */ try { - val allHandlers = components.flatMap(_.getHandlers) ++ handlers - val (server, boundPort) = WebUI.startJettyServer("0.0.0.0", port, allHandlers) - logInfo("Started Spark Web UI at http://%s:%d".format(host, boundPort)) + val (server, usedPort) = JettyUI.startJettyServer("0.0.0.0", port, allHandlers) + logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort)) + boundPort = Some(usedPort) } catch { case e: Exception => - logError("Failed to create Spark WebUI", e) + logError("Failed to create Spark JettyUI", e) System.exit(1) } } - private[spark] def appUIAddress = "http://" + host + ":" + port + private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1") } object SparkUI { val DEFAULT_PORT = "33000" val STATIC_RESOURCE_DIR = "spark/webui/static" } - - - diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 1740524f49..811baae811 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -1,15 +1,20 @@ package spark.ui.jobs -import spark.ui.{WebUI, View} -import xml.{NodeSeq, Node} -import spark.ui.WebUI._ -import scala.Some import akka.util.Duration -import spark.scheduler.Stage + import java.util.Date + import javax.servlet.http.HttpServletRequest -class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] { +import scala.Some + +import spark.scheduler.Stage +import spark.ui.UIUtils._ + +import xml.{NodeSeq, Node} + +/** Page showing list of all ongoing and recently finished stages */ +class IndexPage(parent: JobProgressUI) { val listener = parent.listener val dateFmt = parent.dateFmt @@ -24,7 +29,7 @@ class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] { val content = <h2>Active Stages</h2> ++ activeStageTable ++ <h2>Completed Stages</h2> ++ completedStageTable - WebUI.headerSparkPage(content, "Spark Stages") + headerSparkPage(content, "Spark Stages") } def getElapsedTime(submitted: Option[Long], completed: Long): String = { @@ -43,7 +48,8 @@ class IndexPage(parent: JobProgressUI) extends View[Seq[Node]] { <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td> <td>{s.origin}</td> <td>{submissionTime}</td> - <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td> + <td>{getElapsedTime(s.submissionTime, + s.completionTime.getOrElse(System.currentTimeMillis()))}</td> <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions} {listener.stageToTasksFailed.getOrElse(s.id, 0) match { case f if f > 0 => "(%s failed)".format(f) diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 9ee962bf2e..31aa0f9f0d 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -1,45 +1,30 @@ package spark.ui.jobs -import spark.{Utils, SparkContext} -import spark.scheduler._ -import spark.scheduler.SparkListenerTaskEnd -import spark.scheduler.StageCompleted -import spark.scheduler.SparkListenerStageSubmitted -import org.eclipse.jetty.server.Handler -import javax.servlet.http.HttpServletRequest -import xml.Node -import collection.mutable._ -import spark.Success -import akka.util.Duration import java.text.SimpleDateFormat -import java.util.Date + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import scala.Seq +import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} + +import spark.ui.JettyUI._ +import spark.SparkContext +import spark.scheduler._ import spark.scheduler.cluster.TaskInfo -import collection.mutable -import org.hsqldb.lib.HashMappedList import spark.executor.TaskMetrics -import spark.scheduler.SparkListenerTaskEnd -import scala.Some -import spark.scheduler.SparkListenerStageSubmitted -import scala.Seq -import spark.scheduler.StageCompleted -import spark.scheduler.SparkListenerJobStart -import spark.ui.{WebUI, UIComponent} -import spark.ui.WebUI._ -import spark.scheduler.SparkListenerTaskEnd -import scala.Some -import spark.scheduler.SparkListenerStageSubmitted -import spark.scheduler.StageCompleted -import spark.scheduler.SparkListenerJobStart +import spark.Success -private[spark] -class JobProgressUI(sc: SparkContext) extends UIComponent { +/** Web UI showing progress status of all jobs in the given SparkContext. */ +private[spark] class JobProgressUI(sc: SparkContext) { val listener = new JobProgressListener val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss") sc.addSparkListener(listener) - val indexPage = new IndexPage(this) - val stagePage = new StagePage(this) + private val indexPage = new IndexPage(this) + private val stagePage = new StagePage(this) def getHandlers = Seq[(String, Handler)]( ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)), diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 64b8c8418c..74ac811cef 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -1,17 +1,20 @@ package spark.ui.jobs +import java.util.Date + import javax.servlet.http.HttpServletRequest -import xml.Node -import spark.ui.WebUI._ -import spark.ui.WebUI -import spark.ui.View + +import scala.collection.mutable.ListBuffer + +import spark.ui.UIUtils._ import spark.util.Distribution import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics -import java.util.Date -import scala.collection.mutable.ListBuffer -class StagePage(parent: JobProgressUI) extends View[Seq[Node]] { +import xml.Node + +/** Page showing statistics and task list for a given stage */ +class StagePage(parent: JobProgressUI) { val listener = parent.listener val dateFmt = parent.dateFmt @@ -52,7 +55,7 @@ class StagePage(parent: JobProgressUI) extends View[Seq[Node]] { val content = <h2>Summary Metrics</h2> ++ quantileTable ++ <h2>Tasks</h2> ++ taskTable; - WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId)) + headerSparkPage(content, "Stage Details: %s".format(stageId)) } def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala index 21555384dd..d83c826033 100644 --- a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala @@ -4,15 +4,12 @@ import akka.util.Duration import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.Handler import spark.{Logging, SparkContext} -import spark.ui.WebUI._ +import spark.ui.JettyUI._ import spark.ui.{UIComponent} -/** - * Web UI server for the BlockManager inside each SparkContext. - */ +/** Web UI showing storage status of all RDD's in the given SparkContext. */ private[spark] -class BlockManagerUI(val sc: SparkContext) - extends UIComponent with Logging { +class BlockManagerUI(val sc: SparkContext) extends Logging { implicit val timeout = Duration.create( System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala index 2f4857d48a..5ead772bc0 100644 --- a/core/src/main/scala/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala @@ -2,12 +2,12 @@ package spark.ui.storage import xml.Node import spark.storage.{RDDInfo, StorageUtils} -import spark.ui.WebUI._ import spark.Utils -import spark.ui.{View, WebUI} +import spark.ui.UIUtils._ import javax.servlet.http.HttpServletRequest -class IndexPage(parent: BlockManagerUI) extends View[Seq[Node]] { +/** Page showing list of RDD's currently stored in the cluster */ +class IndexPage(parent: BlockManagerUI) { val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { @@ -40,7 +40,7 @@ class IndexPage(parent: BlockManagerUI) extends View[Seq[Node]] { </div> </div> ++ {rddTable}; - WebUI.headerSparkPage(content, "Spark Storage ") + headerSparkPage(content, "Spark Storage ") } def rddRow(rdd: RDDInfo): Seq[Node] = { diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala index 957c400080..7628fde4aa 100644 --- a/core/src/main/scala/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala @@ -1,16 +1,16 @@ package spark.ui.storage import javax.servlet.http.HttpServletRequest -import xml.Node + import spark.storage.{StorageStatus, StorageUtils} -import spark.ui.WebUI._ +import spark.ui.UIUtils._ import spark.Utils -import spark.ui.WebUI -import spark.ui.View import spark.storage.BlockManagerMasterActor.BlockStatus +import xml.Node -class RDDPage(parent: BlockManagerUI) extends View[Seq[Node]] { +/** Page showing storage details for a given RDD */ +class RDDPage(parent: BlockManagerUI) { val sc = parent.sc def render(request: HttpServletRequest): Seq[Node] = { @@ -65,7 +65,7 @@ class RDDPage(parent: BlockManagerUI) extends View[Seq[Node]] { </div> <hr/> ++ {workerTable}; - WebUI.headerSparkPage(content, "RDD Info: " + id) + headerSparkPage(content, "RDD Info: " + id) } def blockRow(blk: (String, BlockStatus)): Seq[Node] = { |