aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-11 23:33:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-11 23:33:49 -0700
commit6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6 (patch)
tree66542ff62c23f52366ea2e25e346467bff14c97b /core
parent165e06a74c3d75e6b7341c120943add8b035b96a (diff)
downloadspark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.tar.gz
spark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.tar.bz2
spark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.zip
[SPARK-1386] Web UI for Spark Streaming
When debugging Spark Streaming applications it is necessary to monitor certain metrics that are not shown in the Spark application UI. For example, what is average processing time of batches? What is the scheduling delay? Is the system able to process as fast as it is receiving data? How many records I am receiving through my receivers? While the StreamingListener interface introduced in the 0.9 provided some of this information, it could only be accessed programmatically. A UI that shows information specific to the streaming applications is necessary for easier debugging. This PR introduces such a UI. It shows various statistics related to the streaming application. Here is a screenshot of the UI running on my local machine. http://i.imgur.com/1ooDGhm.png This UI is integrated into the Spark UI running at 4040. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Andrew Or <andrewor14@gmail.com> Closes #290 from tdas/streaming-web-ui and squashes the following commits: fc73ca5 [Tathagata Das] Merge pull request #9 from andrewor14/ui-refactor 642dd88 [Andrew Or] Merge SparkUISuite.scala into UISuite.scala eb30517 [Andrew Or] Merge github.com:apache/spark into ui-refactor f4f4cbe [Tathagata Das] More minor fixes. 34bb364 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 252c566 [Tathagata Das] Merge pull request #8 from andrewor14/ui-refactor e038b4b [Tathagata Das] Addressed Patrick's comments. 125a054 [Andrew Or] Disable serving static resources with gzip 90feb8d [Andrew Or] Address Patrick's comments 89dae36 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 72fe256 [Tathagata Das] Merge pull request #6 from andrewor14/ui-refactor 2fc09c8 [Tathagata Das] Added binary check exclusions aa396d4 [Andrew Or] Rename tabs and pages (No more IndexPage.scala) f8e1053 [Tathagata Das] Added Spark and Streaming UI unit tests. caa5e05 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 585cd65 [Tathagata Das] Merge pull request #5 from andrewor14/ui-refactor 914b8ff [Tathagata Das] Moved utils functions to UIUtils. 548c98c [Andrew Or] Wide refactoring of WebUI, UITab, and UIPage (see commit message) 6de06b0 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui ee6543f [Tathagata Das] Minor changes based on Andrew's comments. fa760fe [Tathagata Das] Fixed long line. 1c0bcef [Tathagata Das] Refactored streaming UI into two files. 1af239b [Tathagata Das] Changed streaming UI to attach itself as a tab with the Spark UI. 827e81a [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 168fe86 [Tathagata Das] Merge pull request #2 from andrewor14/ui-refactor 3e986f8 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui c78c92d [Andrew Or] Remove outdated comment 8f7323b [Andrew Or] End of file new lines, indentation, and imports (minor) 0d61ee8 [Andrew Or] Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refactor 9a48fa1 [Andrew Or] Allow adding tabs to SparkUI dynamically + add example 61358e3 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-web-ui 53be2c5 [Tathagata Das] Minor style updates. ed25dfc [Andrew Or] Generalize SparkUI header to display tabs dynamically a37ad4f [Andrew Or] Comments, imports and formatting (minor) cd000b0 [Andrew Or] Merge github.com:apache/spark into ui-refactor 7d57444 [Andrew Or] Refactoring the UI interface to add flexibility aef4dd5 [Tathagata Das] Added Apache licenses. db27bad [Tathagata Das] Added last batch processing time to StreamingUI. 4d86e98 [Tathagata Das] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later. 93f1c69 [Tathagata Das] Added network receiver information to the Streaming UI. 56cc7fb [Tathagata Das] First cut implementation of Streaming UI.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala (renamed from core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala)12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala61
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala (renamed from core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala)23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala54
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala147
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala (renamed from core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala)6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala180
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/Page.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala108
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala172
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala141
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala (renamed from core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala)47
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala (renamed from core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala)84
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala (renamed from core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala)16
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala (renamed from core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala)45
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala (renamed from core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala)13
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala (renamed from core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala)32
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/SparkUISuite.scala35
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala81
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala4
38 files changed, 865 insertions, 779 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3bcc8ce2b2..a764c174d5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -213,7 +213,6 @@ class SparkContext(config: SparkConf) extends Logging {
// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
- ui.start()
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
deleted file mode 100644
index 33fceae4ff..0000000000
--- a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy
-
-import org.apache.spark.ui.{SparkUI, WebUI}
-
-private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {
-
- /** Attach a SparkUI to this container. Only valid after bind(). */
- def attachUI(ui: SparkUI) {
- assert(serverInfo.isDefined,
- "%s must be bound to a server before attaching SparkUIs".format(name))
- val rootHandler = serverInfo.get.rootHandler
- for (handler <- ui.handlers) {
- rootHandler.addHandler(handler)
- if (!handler.isStarted) {
- handler.start()
- }
- }
- }
-
- /** Detach a SparkUI from this container. Only valid after bind(). */
- def detachUI(ui: SparkUI) {
- assert(serverInfo.isDefined,
- "%s must be bound to a server before detaching SparkUIs".format(name))
- val rootHandler = serverInfo.get.rootHandler
- for (handler <- ui.handlers) {
- if (handler.isStarted) {
- handler.stop()
- }
- rootHandler.removeHandler(handler)
- }
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 54dffffec7..180c853ce3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.ui.{UIUtils, WebUI}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
-private[spark] class IndexPage(parent: HistoryServer) {
+private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
@@ -62,13 +62,13 @@ private[spark] class IndexPage(parent: HistoryServer) {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
- val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
- val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed"
+ val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
+ val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
- val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
+ val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = info.logDirPath.getName
- val lastUpdated = WebUI.formatDate(info.lastUpdated)
+ val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 97d2ba9dee..cf64700f90 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,17 +17,13 @@
package org.apache.spark.deploy.history
-import javax.servlet.http.HttpServletRequest
-
import scala.collection.mutable
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.{Logging, SecurityManager, SparkConf}
-import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.scheduler._
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.{WebUI, SparkUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -46,17 +42,15 @@ import org.apache.spark.util.Utils
*/
class HistoryServer(
val baseLogDir: String,
+ securityManager: SecurityManager,
conf: SparkConf)
- extends SparkUIContainer("History Server") with Logging {
+ extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
import HistoryServer._
private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
- private val port = WEB_UI_PORT
- private val securityManager = new SecurityManager(conf)
- private val indexPage = new IndexPage(this)
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L
@@ -90,37 +84,23 @@ class HistoryServer(
}
}
- private val handlers = Seq[ServletContextHandler](
- createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
- createServletHandler("/",
- (request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
- )
-
// A mapping of application ID to its history information, which includes the rendered UI
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
+ initialize()
+
/**
- * Start the history server.
+ * Initialize the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
- def start() {
+ def initialize() {
+ attachPage(new HistoryPage(this))
+ attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
logCheckingThread.start()
}
- /** Bind to the HTTP server behind this web interface. */
- override def bind() {
- try {
- serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
- logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to bind HistoryServer", e)
- System.exit(1)
- }
- }
-
/**
* Check for any updates to event logs in the base directory. This is only effective once
* the server has been bound.
@@ -151,7 +131,7 @@ class HistoryServer(
// Remove any applications that should no longer be retained
appIdToInfo.foreach { case (appId, info) =>
if (!retainedAppIds.contains(appId)) {
- detachUI(info.ui)
+ detachSparkUI(info.ui)
appIdToInfo.remove(appId)
}
}
@@ -186,15 +166,14 @@ class HistoryServer(
val path = logDir.getPath
val appId = path.getName
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
- val ui = new SparkUI(replayBus, appId, "/history/" + appId)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
+ val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
// Do not call ui.bind() to avoid creating a new server for each application
- ui.start()
replayBus.replay()
if (appListener.applicationStarted) {
- attachUI(ui)
+ attachSparkUI(ui)
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
@@ -213,6 +192,18 @@ class HistoryServer(
fileSystem.close()
}
+ /** Attach a reconstructed UI to this server. Only valid after bind(). */
+ private def attachSparkUI(ui: SparkUI) {
+ assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
+ ui.getHandlers.foreach(attachHandler)
+ }
+
+ /** Detach a reconstructed UI from this server. Only valid after bind(). */
+ private def detachSparkUI(ui: SparkUI) {
+ assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
+ ui.getHandlers.foreach(detachHandler)
+ }
+
/** Return the address of this server. */
def getAddress: String = "http://" + publicHost + ":" + boundPort
@@ -262,9 +253,9 @@ object HistoryServer {
def main(argStrings: Array[String]) {
val args = new HistoryServerArguments(argStrings)
- val server = new HistoryServer(args.logDir, conf)
+ val securityManager = new SecurityManager(conf)
+ val server = new HistoryServer(args.logDir, securityManager, conf)
server.bind()
- server.start()
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2446e86cb6..6c58e741df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -625,7 +625,7 @@ private[spark] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
- appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
+ appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
@@ -667,12 +667,12 @@ private[spark] class Master(
if (!eventLogPaths.isEmpty) {
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
- val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
- ui.start()
+ val ui = new SparkUI(
+ new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
replayBus.replay()
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
- webUi.attachUI(ui)
+ webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index cb092cb5d5..b5cd4d2ea9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -28,15 +28,16 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
-private[spark] class ApplicationPage(parent: MasterWebUI) {
- val master = parent.masterActorRef
- val timeout = parent.timeout
+private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") {
+
+ private val master = parent.masterActorRef
+ private val timeout = parent.timeout
/** Executor details for a particular application */
- def renderJson(request: HttpServletRequest): JValue = {
+ override def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
@@ -96,7 +97,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}
- def executorRow(executor: ExecutorInfo): Seq[Node] = {
+ private def executorRow(executor: ExecutorInfo): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 8c1d6c7cce..7ca3b08a28 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -25,17 +25,17 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue
-import org.apache.spark.deploy.{JsonProtocol}
+import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
-private[spark] class IndexPage(parent: MasterWebUI) {
- val master = parent.masterActorRef
- val timeout = parent.timeout
+private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
+ private val master = parent.masterActorRef
+ private val timeout = parent.timeout
- def renderJson(request: HttpServletRequest): JValue = {
+ override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
@@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}
- def workerRow(worker: WorkerInfo): Seq[Node] = {
+ private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
@@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</tr>
}
-
- def appRow(app: ApplicationInfo): Seq[Node] = {
+ private def appRow(app: ApplicationInfo): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
@@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
- <td>{WebUI.formatDate(app.submitDate)}</td>
+ <td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
- <td>{WebUI.formatDuration(app.duration)}</td>
+ <td>{UIUtils.formatDuration(app.duration)}</td>
</tr>
}
- def driverRow(driver: DriverInfo): Seq[Node] = {
+ private def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 30c8ade408..a18b39fc95 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,14 +17,9 @@
package org.apache.spark.deploy.master.ui
-import javax.servlet.http.HttpServletRequest
-
-import org.eclipse.jetty.servlet.ServletContextHandler
-
import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.deploy.master.Master
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -33,44 +28,33 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
- extends SparkUIContainer("MasterWebUI") with Logging {
+ extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
- private val host = Utils.localHostName()
- private val port = requestedPort
- private val applicationPage = new ApplicationPage(this)
- private val indexPage = new IndexPage(this)
+ initialize()
- private val handlers: Seq[ServletContextHandler] = {
- master.masterMetricsSystem.getServletHandlers ++
- master.applicationMetricsSystem.getServletHandlers ++
- Seq[ServletContextHandler](
- createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
- createServletHandler("/app/json",
- (request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr),
- createServletHandler("/app",
- (request: HttpServletRequest) => applicationPage.render(request), master.securityMgr),
- createServletHandler("/json",
- (request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr),
- createServletHandler("/",
- (request: HttpServletRequest) => indexPage.render(request), master.securityMgr)
- )
+ /** Initialize all components of the server. */
+ def initialize() {
+ attachPage(new ApplicationPage(this))
+ attachPage(new MasterPage(this))
+ attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
+ master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
+ master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
}
- /** Bind to the HTTP server behind this web interface. */
- override def bind() {
- try {
- serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
- logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Master web UI", e)
- System.exit(1)
- }
+ /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
+ def attachSparkUI(ui: SparkUI) {
+ assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
+ ui.getHandlers.foreach(attachHandler)
}
+ /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
+ def detachSparkUI(ui: SparkUI) {
+ assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
+ ui.getHandlers.foreach(detachHandler)
+ }
}
private[spark] object MasterWebUI {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index bf5a8d09dd..52c164ca3c 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -128,8 +128,8 @@ private[spark] class Worker(
host, port, cores, Utils.megabytesToString(memory)))
logInfo("Spark home: " + sparkHome)
createWorkDir()
- webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.bind()
registerWithMaster()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
new file mode 100644
index 0000000000..fec1207948
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.worker.ui
+
+import java.io.File
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
+ private val worker = parent.worker
+ private val workDir = parent.workDir
+
+ def renderLog(request: HttpServletRequest): String = {
+ val defaultBytes = 100 * 1024
+
+ val appId = Option(request.getParameter("appId"))
+ val executorId = Option(request.getParameter("executorId"))
+ val driverId = Option(request.getParameter("driverId"))
+ val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+
+ val path = (appId, executorId, driverId) match {
+ case (Some(a), Some(e), None) =>
+ s"${workDir.getPath}/$appId/$executorId/$logType"
+ case (None, None, Some(d)) =>
+ s"${workDir.getPath}/$driverId/$logType"
+ case _ =>
+ throw new Exception("Request must specify either application or driver identifiers")
+ }
+
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+
+ val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
+ pre + Utils.offsetBytes(path, startByte, endByte)
+ }
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val defaultBytes = 100 * 1024
+ val appId = Option(request.getParameter("appId"))
+ val executorId = Option(request.getParameter("executorId"))
+ val driverId = Option(request.getParameter("driverId"))
+ val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+
+ val (path, params) = (appId, executorId, driverId) match {
+ case (Some(a), Some(e), None) =>
+ (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
+ case (None, None, Some(d)) =>
+ (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
+ case _ =>
+ throw new Exception("Request must specify either application or driver identifiers")
+ }
+
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+ val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+ val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
+ val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
+
+ val backButton =
+ if (startByte > 0) {
+ <a href={"?%s&logType=%s&offset=%s&byteLength=%s"
+ .format(params, logType, math.max(startByte - byteLength, 0), byteLength)}>
+ <button type="button" class="btn btn-default">
+ Previous {Utils.bytesToString(math.min(byteLength, startByte))}
+ </button>
+ </a>
+ }
+ else {
+ <button type="button" class="btn btn-default" disabled="disabled">
+ Previous 0 B
+ </button>
+ }
+
+ val nextButton =
+ if (endByte < logLength) {
+ <a href={"?%s&logType=%s&offset=%s&byteLength=%s".
+ format(params, logType, endByte, byteLength)}>
+ <button type="button" class="btn btn-default">
+ Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
+ </button>
+ </a>
+ }
+ else {
+ <button type="button" class="btn btn-default" disabled="disabled">
+ Next 0 B
+ </button>
+ }
+
+ val content =
+ <html>
+ <body>
+ {linkToMaster}
+ <div>
+ <div style="float:left; margin-right:10px">{backButton}</div>
+ <div style="float:left;">{range}</div>
+ <div style="float:right; margin-left:10px">{nextButton}</div>
+ </div>
+ <br />
+ <div style="height:500px; overflow:auto; padding:5px;">
+ <pre>{logText}</pre>
+ </div>
+ </body>
+ </html>
+ UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+ }
+
+ /** Determine the byte range for a log or log page. */
+ private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
+ val defaultBytes = 100 * 1024
+ val maxBytes = 1024 * 1024
+ val file = new File(path)
+ val logLength = file.length()
+ val getOffset = offset.getOrElse(logLength - defaultBytes)
+ val startByte =
+ if (getOffset < 0) 0L
+ else if (getOffset > logLength) logLength
+ else getOffset
+ val logPageLength = math.min(byteLength, maxBytes)
+ val endByte = math.min(startByte + logPageLength, logLength)
+ (startByte, endByte)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
index 49c1009cac..d4513118ce 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
@@ -28,15 +28,15 @@ import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
-private[spark] class IndexPage(parent: WorkerWebUI) {
+private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
- def renderJson(request: HttpServletRequest): JValue = {
+ override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 5625a44549..0ad2edba22 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -20,174 +20,44 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
+import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.AkkaUtils
/**
* Web UI server for the standalone worker.
*/
private[spark]
-class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
- extends WebUI("WorkerWebUI") with Logging {
+class WorkerWebUI(
+ val worker: Worker,
+ val workDir: File,
+ port: Option[Int] = None)
+ extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
+ with Logging {
val timeout = AkkaUtils.askTimeout(worker.conf)
- private val host = Utils.localHostName()
- private val port = requestedPort.getOrElse(
- worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
- private val indexPage = new IndexPage(this)
-
- private val handlers: Seq[ServletContextHandler] = {
- worker.metricsSystem.getServletHandlers ++
- Seq[ServletContextHandler](
- createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"),
- createServletHandler("/log",
- (request: HttpServletRequest) => log(request), worker.securityMgr),
- createServletHandler("/logPage",
- (request: HttpServletRequest) => logPage(request), worker.securityMgr),
- createServletHandler("/json",
- (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr),
- createServletHandler("/",
- (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr)
- )
- }
-
- /** Bind to the HTTP server behind this web interface. */
- override def bind() {
- try {
- serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf))
- logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Worker web UI", e)
- System.exit(1)
- }
- }
-
- private def log(request: HttpServletRequest): String = {
- val defaultBytes = 100 * 1024
-
- val appId = Option(request.getParameter("appId"))
- val executorId = Option(request.getParameter("executorId"))
- val driverId = Option(request.getParameter("driverId"))
- val logType = request.getParameter("logType")
- val offset = Option(request.getParameter("offset")).map(_.toLong)
- val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-
- val path = (appId, executorId, driverId) match {
- case (Some(a), Some(e), None) =>
- s"${workDir.getPath}/$appId/$executorId/$logType"
- case (None, None, Some(d)) =>
- s"${workDir.getPath}/$driverId/$logType"
- case _ =>
- throw new Exception("Request must specify either application or driver identifiers")
- }
-
- val (startByte, endByte) = getByteRange(path, offset, byteLength)
- val file = new File(path)
- val logLength = file.length
-
- val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n"
- pre + Utils.offsetBytes(path, startByte, endByte)
- }
-
- private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
- val defaultBytes = 100 * 1024
- val appId = Option(request.getParameter("appId"))
- val executorId = Option(request.getParameter("executorId"))
- val driverId = Option(request.getParameter("driverId"))
- val logType = request.getParameter("logType")
- val offset = Option(request.getParameter("offset")).map(_.toLong)
- val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
-
- val (path, params) = (appId, executorId, driverId) match {
- case (Some(a), Some(e), None) =>
- (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e")
- case (None, None, Some(d)) =>
- (s"${workDir.getPath}/$d/$logType", s"driverId=$d")
- case _ =>
- throw new Exception("Request must specify either application or driver identifiers")
- }
-
- val (startByte, endByte) = getByteRange(path, offset, byteLength)
- val file = new File(path)
- val logLength = file.length
- val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
- val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
- val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
-
- val backButton =
- if (startByte > 0) {
- <a href={"?%s&logType=%s&offset=%s&byteLength=%s"
- .format(params, logType, math.max(startByte - byteLength, 0), byteLength)}>
- <button type="button" class="btn btn-default">
- Previous {Utils.bytesToString(math.min(byteLength, startByte))}
- </button>
- </a>
- }
- else {
- <button type="button" class="btn btn-default" disabled="disabled">
- Previous 0 B
- </button>
- }
-
- val nextButton =
- if (endByte < logLength) {
- <a href={"?%s&logType=%s&offset=%s&byteLength=%s".
- format(params, logType, endByte, byteLength)}>
- <button type="button" class="btn btn-default">
- Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
- </button>
- </a>
- }
- else {
- <button type="button" class="btn btn-default" disabled="disabled">
- Next 0 B
- </button>
- }
-
- val content =
- <html>
- <body>
- {linkToMaster}
- <div>
- <div style="float:left; margin-right:10px">{backButton}</div>
- <div style="float:left;">{range}</div>
- <div style="float:right; margin-left:10px">{nextButton}</div>
- </div>
- <br />
- <div style="height:500px; overflow:auto; padding:5px;">
- <pre>{logText}</pre>
- </div>
- </body>
- </html>
- UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+ initialize()
+
+ /** Initialize all components of the server. */
+ def initialize() {
+ val logPage = new LogPage(this)
+ attachPage(logPage)
+ attachPage(new WorkerPage(this))
+ attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
+ attachHandler(createServletHandler("/log",
+ (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
+ worker.metricsSystem.getServletHandlers.foreach(attachHandler)
}
-
- /** Determine the byte range for a log or log page. */
- private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = {
- val defaultBytes = 100 * 1024
- val maxBytes = 1024 * 1024
- val file = new File(path)
- val logLength = file.length()
- val getOffset = offset.getOrElse(logLength - defaultBytes)
- val startByte =
- if (getOffset < 0) 0L
- else if (getOffset > logLength) logLength
- else getOffset
- val logPageLength = math.min(byteLength, maxBytes)
- val endByte = math.min(startByte + logPageLength, logLength)
- (startByte, endByte)
- }
-
}
private[spark] object WorkerWebUI {
- val DEFAULT_PORT=8081
+ val DEFAULT_PORT = 8081
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
+
+ def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
+ requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index affda13df6..c100122715 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -31,11 +31,11 @@ private[spark] class ApplicationEventListener extends SparkListener {
def applicationStarted = startTime != -1
- def applicationFinished = endTime != -1
+ def applicationCompleted = endTime != -1
def applicationDuration: Long = {
val difference = endTime - startTime
- if (applicationStarted && applicationFinished && difference > 0) difference else -1L
+ if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
}
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 07255aa366..7ed3713268 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -42,24 +42,22 @@ class StorageStatus(
def memRemaining : Long = maxMem - memUsed()
- def rddBlocks = blocks.flatMap {
- case (rdd: RDDBlockId, status) => Some(rdd, status)
- case _ => None
- }
+ def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) }
}
@DeveloperApi
private[spark]
class RDDInfo(
- val id: Int,
- val name: String,
- val numPartitions: Int,
- val storageLevel: StorageLevel) extends Ordered[RDDInfo] {
+ val id: Int,
+ val name: String,
+ val numPartitions: Int,
+ val storageLevel: StorageLevel)
+ extends Ordered[RDDInfo] {
var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
- var tachyonSize= 0L
+ var tachyonSize = 0L
override def toString = {
import Utils.bytesToString
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index dd0818e8ab..62a4e3d0f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -121,6 +121,7 @@ private[spark] object JettyUtils extends Logging {
/** Create a handler for serving files from a static directory */
def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
val contextHandler = new ServletContextHandler
+ contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", "false")
val staticHandler = new DefaultServlet
val holder = new ServletHolder(staticHandler)
Option(getClass.getClassLoader.getResource(resourceBase)) match {
diff --git a/core/src/main/scala/org/apache/spark/ui/Page.scala b/core/src/main/scala/org/apache/spark/ui/Page.scala
deleted file mode 100644
index b2a069a375..0000000000
--- a/core/src/main/scala/org/apache/spark/ui/Page.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui
-
-private[spark] object Page extends Enumeration {
- val Stages, Storage, Environment, Executors = Value
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 7fa4fd3149..2fef1a6354 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,112 +17,86 @@
package org.apache.spark.ui
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.env.EnvironmentUI
-import org.apache.spark.ui.exec.ExecutorsUI
-import org.apache.spark.ui.jobs.JobProgressUI
-import org.apache.spark.ui.storage.BlockManagerUI
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.env.EnvironmentTab
+import org.apache.spark.ui.exec.ExecutorsTab
+import org.apache.spark.ui.jobs.JobProgressTab
+import org.apache.spark.ui.storage.StorageTab
-/** Top level user interface for Spark */
+/**
+ * Top level user interface for a Spark application.
+ */
private[spark] class SparkUI(
val sc: SparkContext,
val conf: SparkConf,
+ val securityManager: SecurityManager,
val listenerBus: SparkListenerBus,
var appName: String,
val basePath: String = "")
- extends WebUI("SparkUI") with Logging {
+ extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
+ with Logging {
- def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName)
- def this(listenerBus: SparkListenerBus, appName: String, basePath: String) =
- this(null, new SparkConf, listenerBus, appName, basePath)
+ def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
+ def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
+ this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath)
// If SparkContext is not provided, assume the associated application is not live
val live = sc != null
- val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
-
- private val localHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
- private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
+ // Maintain executor storage status through Spark events
+ val storageStatusListener = new StorageStatusListener
- private val storage = new BlockManagerUI(this)
- private val jobs = new JobProgressUI(this)
- private val env = new EnvironmentUI(this)
- private val exec = new ExecutorsUI(this)
+ initialize()
- val handlers: Seq[ServletContextHandler] = {
- val metricsServletHandlers = if (live) {
- SparkEnv.get.metricsSystem.getServletHandlers
- } else {
- Array[ServletContextHandler]()
+ /** Initialize all components of the server. */
+ def initialize() {
+ listenerBus.addListener(storageStatusListener)
+ val jobProgressTab = new JobProgressTab(this)
+ attachTab(jobProgressTab)
+ attachTab(new StorageTab(this))
+ attachTab(new EnvironmentTab(this))
+ attachTab(new ExecutorsTab(this))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
+ attachHandler(
+ createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
+ if (live) {
+ sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
}
- storage.getHandlers ++
- jobs.getHandlers ++
- env.getHandlers ++
- exec.getHandlers ++
- metricsServletHandlers ++
- Seq[ServletContextHandler] (
- createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"),
- createRedirectHandler("/", "/stages", basePath = basePath)
- )
}
- // Maintain executor storage status through Spark events
- val storageStatusListener = new StorageStatusListener
-
+ /** Set the app name for this UI. */
def setAppName(name: String) {
appName = name
}
- /** Initialize all components of the server */
- def start() {
- storage.start()
- jobs.start()
- env.start()
- exec.start()
-
- // Storage status listener must receive events first, as other listeners depend on its state
- listenerBus.addListener(storageStatusListener)
- listenerBus.addListener(storage.listener)
- listenerBus.addListener(jobs.listener)
- listenerBus.addListener(env.listener)
- listenerBus.addListener(exec.listener)
- }
-
- /** Bind to the HTTP server behind this web interface. */
- override def bind() {
- try {
- serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
- logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Spark web UI", e)
- System.exit(1)
- }
+ /** Register the given listener with the listener bus. */
+ def registerListener(listener: SparkListener) {
+ listenerBus.addListener(listener)
}
/** Stop the server behind this web interface. Only valid after bind(). */
override def stop() {
super.stop()
- logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
+ logInfo("Stopped Spark web UI at %s".format(appUIAddress))
}
/**
* Return the application UI host:port. This does not include the scheme (http://).
*/
- private[spark] def appUIHostPort = publicHost + ":" + boundPort
+ private[spark] def appUIHostPort = publicHostName + ":" + boundPort
private[spark] def appUIAddress = s"http://$appUIHostPort"
-
}
private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+
+ def getUIPort(conf: SparkConf): Int = {
+ conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index a7cf04b3cb..6a2d652528 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,16 +17,115 @@
package org.apache.spark.ui
+import java.text.SimpleDateFormat
+import java.util.{Locale, Date}
+
import scala.xml.Node
+import org.apache.spark.Logging
/** Utility functions for generating XML pages with spark content. */
-private[spark] object UIUtils {
+private[spark] object UIUtils extends Logging {
+
+ // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+ private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ }
+
+ def formatDate(date: Date): String = dateFormat.get.format(date)
+
+ def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ "%.1f h".format(hours)
+ }
+
+ /** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */
+ def formatDurationVerbose(ms: Long): String = {
+ try {
+ val second = 1000L
+ val minute = 60 * second
+ val hour = 60 * minute
+ val day = 24 * hour
+ val week = 7 * day
+ val year = 365 * day
+
+ def toString(num: Long, unit: String): String = {
+ if (num == 0) {
+ ""
+ } else if (num == 1) {
+ s"$num $unit"
+ } else {
+ s"$num ${unit}s"
+ }
+ }
+
+ val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
+ val secondString = toString((ms % minute) / second, "second")
+ val minuteString = toString((ms % hour) / minute, "minute")
+ val hourString = toString((ms % day) / hour, "hour")
+ val dayString = toString((ms % week) / day, "day")
+ val weekString = toString((ms % year) / week, "week")
+ val yearString = toString(ms / year, "year")
- import Page._
+ Seq(
+ second -> millisecondsString,
+ minute -> s"$secondString $millisecondsString",
+ hour -> s"$minuteString $secondString",
+ day -> s"$hourString $minuteString $secondString",
+ week -> s"$dayString $hourString $minuteString",
+ year -> s"$weekString $dayString $hourString"
+ ).foreach { case (durationLimit, durationString) =>
+ if (ms < durationLimit) {
+ // if time is less than the limit (upto year)
+ return durationString
+ }
+ }
+ // if time is more than a year
+ return s"$yearString $weekString $dayString"
+ } catch {
+ case e: Exception =>
+ logError("Error converting time to string", e)
+ // if there is some error, return blank string
+ return ""
+ }
+ }
+
+ /** Generate a human-readable string representing a number (e.g. 100 K) */
+ def formatNumber(records: Double): String = {
+ val trillion = 1e12
+ val billion = 1e9
+ val million = 1e6
+ val thousand = 1e3
+
+ val (value, unit) = {
+ if (records >= 2*trillion) {
+ (records / trillion, " T")
+ } else if (records >= 2*billion) {
+ (records / billion, " B")
+ } else if (records >= 2*million) {
+ (records / million, " M")
+ } else if (records >= 2*thousand) {
+ (records / thousand, " K")
+ } else {
+ (records, "")
+ }
+ }
+ "%.1f%s".formatLocal(Locale.US, value, unit)
+ }
// Yarn has to go through a proxy so the base uri is provided and has to be on all links
- private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
- getOrElse("")
+ val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("")
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
@@ -36,26 +135,14 @@ private[spark] object UIUtils {
basePath: String,
appName: String,
title: String,
- page: Page.Value) : Seq[Node] = {
- val jobs = page match {
- case Stages =>
- <li class="active"><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/stages")}>Stages</a></li>
- }
- val storage = page match {
- case Storage =>
- <li class="active"><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/storage")}>Storage</a></li>
- }
- val environment = page match {
- case Environment =>
- <li class="active"><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/environment")}>Environment</a></li>
- }
- val executors = page match {
- case Executors =>
- <li class="active"><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
- case _ => <li><a href={prependBaseUri(basePath, "/executors")}>Executors</a></li>
+ tabs: Seq[WebUITab],
+ activeTab: WebUITab,
+ refreshInterval: Option[Int] = None): Seq[Node] = {
+
+ val header = tabs.map { tab =>
+ <li class={if (tab == activeTab) "active" else ""}>
+ <a href={prependBaseUri(basePath, "/" + tab.prefix)}>{tab.name}</a>
+ </li>
}
<html>
@@ -74,16 +161,10 @@ private[spark] object UIUtils {
<a href={prependBaseUri(basePath, "/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
</a>
- <ul class="nav">
- {jobs}
- {storage}
- {environment}
- {executors}
- </ul>
+ <ul class="nav">{header}</ul>
<p class="navbar-text pull-right"><strong>{appName}</strong> application UI</p>
</div>
</div>
-
<div class="container-fluid">
<div class="row-fluid">
<div class="span12">
@@ -129,21 +210,36 @@ private[spark] object UIUtils {
/** Returns an HTML table constructed by generating a row for each object in a sequence. */
def listingTable[T](
headers: Seq[String],
- makeRow: T => Seq[Node],
- rows: Seq[T],
+ generateDataRow: T => Seq[Node],
+ data: Seq[T],
fixedWidth: Boolean = false): Seq[Node] = {
- val colWidth = 100.toDouble / headers.size
- val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
var tableClass = "table table-bordered table-striped table-condensed sortable"
if (fixedWidth) {
tableClass += " table-fixed"
}
-
+ val colWidth = 100.toDouble / headers.size
+ val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
+ val headerRow: Seq[Node] = {
+ // if none of the headers have "\n" in them
+ if (headers.forall(!_.contains("\n"))) {
+ // represent header as simple text
+ headers.map(h => <th width={colWidthAttr}>{h}</th>)
+ } else {
+ // represent header text as list while respecting "\n"
+ headers.map { case h =>
+ <th width={colWidthAttr}>
+ <ul class ="unstyled">
+ { h.split("\n").map { case t => <li> {t} </li> } }
+ </ul>
+ </th>
+ }
+ }
+ }
<table class={tableClass}>
- <thead>{headers.map(h => <th width={colWidthAttr}>{h}</th>)}</thead>
+ <thead>{headerRow}</thead>
<tbody>
- {rows.map(r => makeRow(r))}
+ {data.map(r => generateDataRow(r))}
</tbody>
</table>
}
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 2cc7582eca..b08f308fda 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -17,53 +17,134 @@
package org.apache.spark.ui
-import java.text.SimpleDateFormat
-import java.util.Date
+import javax.servlet.http.HttpServletRequest
-private[spark] abstract class WebUI(name: String) {
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.Node
+
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.json4s.JsonAST.{JNothing, JValue}
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
+
+/**
+ * The top level component of the UI hierarchy that contains the server.
+ *
+ * Each WebUI represents a collection of tabs, each of which in turn represents a collection of
+ * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly.
+ */
+private[spark] abstract class WebUI(
+ securityManager: SecurityManager,
+ port: Int,
+ conf: SparkConf,
+ basePath: String = "")
+ extends Logging {
+
+ protected val tabs = ArrayBuffer[WebUITab]()
+ protected val handlers = ArrayBuffer[ServletContextHandler]()
protected var serverInfo: Option[ServerInfo] = None
+ protected val localHostName = Utils.localHostName()
+ protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+ private val className = Utils.getFormattedClassName(this)
+
+ def getTabs: Seq[WebUITab] = tabs.toSeq
+ def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+
+ /** Attach a tab to this UI, along with all of its attached pages. */
+ def attachTab(tab: WebUITab) {
+ tab.pages.foreach(attachPage)
+ tabs += tab
+ }
+
+ /** Attach a page to this UI. */
+ def attachPage(page: WebUIPage) {
+ val pagePath = "/" + page.prefix
+ attachHandler(createServletHandler(pagePath,
+ (request: HttpServletRequest) => page.render(request), securityManager, basePath))
+ attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
+ (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
+ }
+
+ /** Attach a handler to this UI. */
+ def attachHandler(handler: ServletContextHandler) {
+ handlers += handler
+ serverInfo.foreach { info =>
+ info.rootHandler.addHandler(handler)
+ if (!handler.isStarted) {
+ handler.start()
+ }
+ }
+ }
- /**
- * Bind to the HTTP server behind this web interface.
- * Overridden implementation should set serverInfo.
- */
- def bind() { }
+ /** Detach a handler from this UI. */
+ def detachHandler(handler: ServletContextHandler) {
+ handlers -= handler
+ serverInfo.foreach { info =>
+ info.rootHandler.removeHandler(handler)
+ if (handler.isStarted) {
+ handler.stop()
+ }
+ }
+ }
+
+ /** Initialize all components of the server. */
+ def initialize()
+
+ /** Bind to the HTTP server behind this web interface. */
+ def bind() {
+ assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
+ try {
+ serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+ logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to bind %s".format(className), e)
+ System.exit(1)
+ }
+ }
/** Return the actual port to which this server is bound. Only valid after bind(). */
def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
/** Stop the server behind this web interface. Only valid after bind(). */
def stop() {
- assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name))
+ assert(serverInfo.isDefined,
+ "Attempted to stop %s before binding to a server!".format(className))
serverInfo.get.server.stop()
}
}
+
/**
- * Utilities used throughout the web UI.
+ * A tab that represents a collection of pages.
+ * The prefix is appended to the parent address to form a full path, and must not contain slashes.
*/
-private[spark] object WebUI {
- // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
- private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
- override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
+ val pages = ArrayBuffer[WebUIPage]()
+ val name = prefix.capitalize
+
+ /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */
+ def attachPage(page: WebUIPage) {
+ page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
+ pages += page
}
- def formatDate(date: Date): String = dateFormat.get.format(date)
+ /** Get a list of header tabs from the parent UI. */
+ def headerTabs: Seq[WebUITab] = parent.getTabs
+}
- def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
- def formatDuration(milliseconds: Long): String = {
- val seconds = milliseconds.toDouble / 1000
- if (seconds < 60) {
- return "%.0f s".format(seconds)
- }
- val minutes = seconds / 60
- if (minutes < 10) {
- return "%.1f min".format(minutes)
- } else if (minutes < 60) {
- return "%.0f min".format(minutes)
- }
- val hours = minutes / 60
- "%.1f h".format(hours)
- }
+/**
+ * A page that represents the leaf node in the UI hierarchy.
+ *
+ * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.
+ * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.
+ * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent
+ * to form a relative path. The prefix must not contain slashes.
+ */
+private[spark] abstract class WebUIPage(var prefix: String) {
+ def render(request: HttpServletRequest): Seq[Node]
+ def renderJson(request: HttpServletRequest): JValue = JNothing
}
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
index 33df97187e..b347eb1b83 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
@@ -21,29 +21,12 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.eclipse.jetty.servlet.ServletContextHandler
+import org.apache.spark.ui.{UIUtils, WebUIPage}
-import org.apache.spark.scheduler._
-import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Environment
-
-private[ui] class EnvironmentUI(parent: SparkUI) {
+private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private var _listener: Option[EnvironmentListener] = None
-
- private def appName = parent.appName
-
- lazy val listener = _listener.get
-
- def start() {
- _listener = Some(new EnvironmentListener)
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/environment",
- (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
- )
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val runtimeInformationTable = UIUtils.listingTable(
@@ -62,7 +45,7 @@ private[ui] class EnvironmentUI(parent: SparkUI) {
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>
- UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment)
+ UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent)
}
private def propertyHeader = Seq("Name", "Value")
@@ -71,23 +54,3 @@ private[ui] class EnvironmentUI(parent: SparkUI) {
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}
-
-/**
- * A SparkListener that prepares information to be displayed on the EnvironmentUI
- */
-private[ui] class EnvironmentListener extends SparkListener {
- var jvmInformation = Seq[(String, String)]()
- var sparkProperties = Seq[(String, String)]()
- var systemProperties = Seq[(String, String)]()
- var classpathEntries = Seq[(String, String)]()
-
- override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
- synchronized {
- val environmentDetails = environmentUpdate.environmentDetails
- jvmInformation = environmentDetails("JVM Information")
- sparkProperties = environmentDetails("Spark Properties")
- systemProperties = environmentDetails("System Properties")
- classpathEntries = environmentDetails("Classpath Entries")
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
new file mode 100644
index 0000000000..03b46e1bd5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.env
+
+import org.apache.spark.scheduler._
+import org.apache.spark.ui._
+
+private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val listener = new EnvironmentListener
+
+ attachPage(new EnvironmentPage(this))
+ parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the EnvironmentTab
+ */
+private[ui] class EnvironmentListener extends SparkListener {
+ var jvmInformation = Seq[(String, String)]()
+ var sparkProperties = Seq[(String, String)]()
+ var systemProperties = Seq[(String, String)]()
+ var classpathEntries = Seq[(String, String)]()
+
+ override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
+ synchronized {
+ val environmentDetails = environmentUpdate.environmentDetails
+ jvmInformation = environmentDetails("JVM Information")
+ sparkProperties = environmentDetails("Spark Properties")
+ systemProperties = environmentDetails("System Properties")
+ classpathEntries = environmentDetails("Classpath Entries")
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 77a38a1d3a..c1e69f6cda 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -19,35 +19,15 @@ package org.apache.spark.ui.exec
import javax.servlet.http.HttpServletRequest
-import scala.collection.mutable.HashMap
import scala.xml.Node
-import org.eclipse.jetty.servlet.ServletContextHandler
-
-import org.apache.spark.ExceptionFailure
-import org.apache.spark.scheduler._
-import org.apache.spark.storage.StorageStatusListener
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.Page.Executors
-import org.apache.spark.ui.{SparkUI, UIUtils}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
-private[ui] class ExecutorsUI(parent: SparkUI) {
+private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private var _listener: Option[ExecutorsListener] = None
-
- private def appName = parent.appName
-
- lazy val listener = _listener.get
-
- def start() {
- _listener = Some(new ExecutorsListener(parent.storageStatusListener))
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/executors",
- (request: HttpServletRequest) => render(request), parent.securityManager, basePath)
- )
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
@@ -75,8 +55,8 @@ private[ui] class ExecutorsUI(parent: SparkUI) {
</div>
</div>;
- UIUtils.headerSparkPage(
- content, basePath, appName, "Executors (" + execInfo.size + ")", Executors)
+ UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")",
+ parent.headerTabs, parent)
}
/** Header fields for the executors table */
@@ -159,55 +139,3 @@ private[ui] class ExecutorsUI(parent: SparkUI) {
execFields.zip(execValues).toMap
}
}
-
-/**
- * A SparkListener that prepares information to be displayed on the ExecutorsUI
- */
-private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
- extends SparkListener {
-
- val executorToTasksActive = HashMap[String, Int]()
- val executorToTasksComplete = HashMap[String, Int]()
- val executorToTasksFailed = HashMap[String, Int]()
- val executorToDuration = HashMap[String, Long]()
- val executorToShuffleRead = HashMap[String, Long]()
- val executorToShuffleWrite = HashMap[String, Long]()
-
- def storageStatusList = storageStatusListener.storageStatusList
-
- override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
- val eid = formatExecutorId(taskStart.taskInfo.executorId)
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
- val info = taskEnd.taskInfo
- if (info != null) {
- val eid = formatExecutorId(info.executorId)
- executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
- executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
- taskEnd.reason match {
- case e: ExceptionFailure =>
- executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
- case _ =>
- executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
- }
-
- // Update shuffle read/write
- val metrics = taskEnd.taskMetrics
- if (metrics != null) {
- metrics.shuffleReadMetrics.foreach { shuffleRead =>
- executorToShuffleRead(eid) =
- executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
- }
- metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
- executorToShuffleWrite(eid) =
- executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
- }
- }
- }
- }
-
- // This addresses executor ID inconsistencies in the local mode
- private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
-}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
new file mode 100644
index 0000000000..5678bf34ac
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.ExceptionFailure
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.ui.{SparkUI, WebUITab}
+
+private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") {
+ val appName = parent.appName
+ val basePath = parent.basePath
+ val listener = new ExecutorsListener(parent.storageStatusListener)
+
+ attachPage(new ExecutorsPage(this))
+ parent.registerListener(listener)
+}
+
+/**
+ * A SparkListener that prepares information to be displayed on the ExecutorsTab
+ */
+private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
+ extends SparkListener {
+
+ val executorToTasksActive = HashMap[String, Int]()
+ val executorToTasksComplete = HashMap[String, Int]()
+ val executorToTasksFailed = HashMap[String, Int]()
+ val executorToDuration = HashMap[String, Long]()
+ val executorToShuffleRead = HashMap[String, Long]()
+ val executorToShuffleWrite = HashMap[String, Long]()
+
+ def storageStatusList = storageStatusListener.storageStatusList
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+ val eid = formatExecutorId(taskStart.taskInfo.executorId)
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ val info = taskEnd.taskInfo
+ if (info != null) {
+ val eid = formatExecutorId(info.executorId)
+ executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
+ executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+ case _ =>
+ executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+ }
+
+ // Update shuffle read/write
+ val metrics = taskEnd.taskMetrics
+ if (metrics != null) {
+ metrics.shuffleReadMetrics.foreach { shuffleRead =>
+ executorToShuffleRead(eid) =
+ executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
+ }
+ metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
+ executorToShuffleWrite(eid) =
+ executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
+ }
+ }
+ }
+ }
+
+ // This addresses executor ID inconsistencies in the local mode
+ private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId)
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 73861ae674..c83e196c9c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable
import scala.xml.Node
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
/** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
- private lazy val listener = parent.listener
+private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+ private val listener = parent.listener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
@@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) {
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
- <td>{parent.formatDuration(v.taskTime)}</td>
+ <td>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5167e20ea3..0db4afa701 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -222,12 +222,10 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
- val schedulingModeName =
- environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode")
- schedulingMode = schedulingModeName match {
- case Some(name) => Some(SchedulingMode.withName(name))
- case None => None
- }
+ schedulingMode = environmentUpdate
+ .environmentDetails("Spark Properties").toMap
+ .get("spark.scheduler.mode")
+ .map(SchedulingMode.withName)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
index 8619a31380..34ff2ac34a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
@@ -22,25 +22,23 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.{Node, NodeSeq}
import org.apache.spark.scheduler.Schedulable
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing list of all ongoing and recently finished stages and pools */
-private[ui] class IndexPage(parent: JobProgressUI) {
+private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
+ private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
- private lazy val listener = parent.listener
+ private val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
- private def appName = parent.appName
-
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val activeStages = listener.activeStages.values.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
- val now = System.currentTimeMillis()
+ val now = System.currentTimeMillis
val activeStagesTable =
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
@@ -59,7 +57,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
// Total duration is not meaningful unless the UI is live
<li>
<strong>Total Duration: </strong>
- {parent.formatDuration(now - sc.startTime)}
+ {UIUtils.formatDuration(now - sc.startTime)}
</li>
}}
<li>
@@ -94,7 +92,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
<h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
failedStagesTable.toNodeSeq
- UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", Stages)
+ UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
index 30e3f35f21..3308c8c8a3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
@@ -19,39 +19,28 @@ package org.apache.spark.ui.jobs
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.servlet.ServletContextHandler
-
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.{SparkUI, WebUITab}
/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobProgressUI(parent: SparkUI) {
+private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") {
+ val appName = parent.appName
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
- val killEnabled = parent.conf.getBoolean("spark.ui.killEnabled", true)
-
- lazy val listener = _listener.get
- lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
-
- private val indexPage = new IndexPage(this)
- private val stagePage = new StagePage(this)
- private val poolPage = new PoolPage(this)
- private var _listener: Option[JobProgressListener] = None
+ val conf = if (live) sc.conf else new SparkConf
+ val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
+ val listener = new JobProgressListener(conf)
- def appName = parent.appName
+ attachPage(new JobProgressPage(this))
+ attachPage(new StagePage(this))
+ attachPage(new PoolPage(this))
+ parent.registerListener(listener)
- def start() {
- val conf = if (live) sc.conf else new SparkConf
- _listener = Some(new JobProgressListener(conf))
- }
-
- def formatDuration(ms: Long) = Utils.msDurationToString(ms)
+ def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
- private def handleKillRequest(request: HttpServletRequest) = {
+ def handleKillRequest(request: HttpServletRequest) = {
if (killEnabled) {
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
@@ -64,14 +53,4 @@ private[ui] class JobProgressUI(parent: SparkUI) {
Thread.sleep(100)
}
}
-
- def getHandlers = Seq[ServletContextHandler](
- createRedirectHandler("/stages/stage/kill", "/stages", handleKillRequest),
- createServletHandler("/stages/stage",
- (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath),
- createServletHandler("/stages/pool",
- (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
- createServletHandler("/stages",
- (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
- )
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 3638e6035b..fd83d37583 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -22,17 +22,15 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.scheduler.{Schedulable, StageInfo}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing specific pool details */
-private[ui] class PoolPage(parent: JobProgressUI) {
+private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
+ private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
@@ -52,8 +50,8 @@ private[ui] class PoolPage(parent: JobProgressUI) {
<h4>Summary </h4> ++ poolTable.toNodeSeq ++
<h4>{activeStages.size} Active Stages</h4> ++ activeStagesTable.toNodeSeq
- UIUtils.headerSparkPage(
- content, basePath, appName, "Fair Scheduler Pool: " + poolName, Stages)
+ UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName,
+ parent.headerTabs, parent)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index c5c8d86687..f4b68f2419 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo}
import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
+private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
private val basePath = parent.basePath
- private val poolToActiveStages = listener.poolToActiveStages
- private lazy val listener = parent.listener
+ private val listener = parent.listener
def toNodeSeq: Seq[Node] = {
listener.synchronized {
@@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) {
<th>SchedulingMode</th>
</thead>
<tbody>
- {rows.map(r => makeRow(r, poolToActiveStages))}
+ {rows.map(r => makeRow(r, listener.poolToActiveStages))}
</tbody>
</table>
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index b6c3e3cf45..4bce472036 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -22,17 +22,14 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
-private[ui] class StagePage(parent: JobProgressUI) {
+private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
- private lazy val sc = parent.sc
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
@@ -44,8 +41,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
<h4>Summary Metrics</h4> No tasks have started yet
<h4>Tasks</h4> No tasks have started yet
</div>
- return UIUtils.headerSparkPage(
- content, basePath, appName, "Details for Stage %s".format(stageId), Stages)
+ return UIUtils.headerSparkPage(content, basePath, appName,
+ "Details for Stage %s".format(stageId), parent.headerTabs, parent)
}
val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
@@ -60,7 +57,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
var activeTime = 0L
- val now = System.currentTimeMillis()
+ val now = System.currentTimeMillis
val tasksActive = listener.stageIdToTasksActive(stageId).values
tasksActive.foreach(activeTime += _.timeRunning(now))
@@ -70,7 +67,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
<ul class="unstyled">
<li>
<strong>Total task time across all tasks: </strong>
- {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+ {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
<li>
@@ -121,13 +118,13 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val serializationQuantiles =
"Result serialization time" +: Distribution(serializationTimes).
- get.getQuantiles().map(ms => parent.formatDuration(ms.toLong))
+ get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
- .map(ms => parent.formatDuration(ms.toLong))
+ .map(ms => UIUtils.formatDuration(ms.toLong))
val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
@@ -138,7 +135,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val gettingResultQuantiles = "Time spent fetching task results" +:
Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
@@ -155,7 +152,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
}
val schedulerDelayQuantiles = "Scheduler delay" +:
Distribution(schedulerDelays).get.getQuantiles().map { millis =>
- parent.formatDuration(millis.toLong)
+ UIUtils.formatDuration(millis.toLong)
}
def getQuantileCols(data: Seq[Double]) =
@@ -206,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
<h4>Tasks</h4> ++ taskTable
- UIUtils.headerSparkPage(
- content, basePath, appName, "Details for Stage %d".format(stageId), Stages)
+ UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId),
+ parent.headerTabs, parent)
}
}
@@ -219,8 +216,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
taskData match { case TaskUIData(info, metrics, exception) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
- val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
- else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+ val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
+ else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
@@ -235,8 +232,8 @@ private[ui] class StagePage(parent: JobProgressUI) {
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
- val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms =>
- if (ms == 0) "" else parent.formatDuration(ms)
+ val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
+ if (ms == 0) "" else UIUtils.formatDuration(ms)
}.getOrElse("")
val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
@@ -254,15 +251,15 @@ private[ui] class StagePage(parent: JobProgressUI) {
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
- <td>{WebUI.formatDate(new Date(info.launchTime))}</td>
+ <td>{UIUtils.formatDate(new Date(info.launchTime))}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
<td sorttable_customkey={gcTime.toString}>
- {if (gcTime > 0) parent.formatDuration(gcTime) else ""}
+ {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
<td sorttable_customkey={serializationTime.toString}>
- {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""}
+ {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index e419fae5a6..8c5b1f55fd 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,17 +23,17 @@ import scala.collection.mutable.HashMap
import scala.xml.Node
import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.{WebUI, UIUtils}
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(
- stages: Seq[StageInfo],
- parent: JobProgressUI,
- killEnabled: Boolean = false) {
+ stages: Seq[StageInfo],
+ parent: JobProgressTab,
+ killEnabled: Boolean = false) {
private val basePath = parent.basePath
- private lazy val listener = parent.listener
+ private val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
def toNodeSeq: Seq[Node] = {
@@ -89,25 +89,23 @@ private[ui] class StageTable(
{s.name}
</a>
- val description = listener.stageIdToDescription.get(s.stageId)
+ listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
.getOrElse(<div> {killLink}{nameLink}</div>)
-
- return description
}
/** Render an HTML row that represents a stage */
private def stageRow(s: StageInfo): Seq[Node] = {
val poolName = listener.stageIdToPool.get(s.stageId)
val submissionTime = s.submissionTime match {
- case Some(t) => WebUI.formatDate(new Date(t))
+ case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
}
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
val duration = s.submissionTime.map { t =>
if (finishTime > t) finishTime - t else System.currentTimeMillis - t
}
- val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown")
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val startedTasks =
listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 75ee9976d7..d07f1c9b20 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -22,23 +22,22 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils}
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
-private[ui] class RDDPage(parent: BlockManagerUI) {
+private[ui] class RddPage(parent: StorageTab) extends WebUIPage("rdd") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
// Rather than crashing, render an "RDD Not Found" page
- return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage)
+ return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found",
+ parent.headerTabs, parent)
}
// Worker table
@@ -96,8 +95,8 @@ private[ui] class RDDPage(parent: BlockManagerUI) {
</div>
</div>;
- UIUtils.headerSparkPage(
- content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage)
+ UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name,
+ parent.headerTabs, parent)
}
/** Header fields for the worker table */
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 4f6acc30a8..b66edd91f5 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -22,22 +22,19 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.RDDInfo
-import org.apache.spark.ui.Page._
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
/** Page showing list of RDD's currently stored in the cluster */
-private[ui] class IndexPage(parent: BlockManagerUI) {
+private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
+ private val appName = parent.appName
private val basePath = parent.basePath
- private lazy val listener = parent.listener
-
- private def appName = parent.appName
+ private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
-
val rdds = listener.rddInfoList
val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
- UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage)
+ UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent)
}
/** Header fields for the RDD table */
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 16996a2da1..56429f6c07 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -17,45 +17,27 @@
package org.apache.spark.ui.storage
-import javax.servlet.http.HttpServletRequest
-
import scala.collection.mutable
-import org.eclipse.jetty.servlet.ServletContextHandler
-
import org.apache.spark.ui._
-import org.apache.spark.ui.JettyUtils._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
/** Web UI showing storage status of all RDD's in the given SparkContext. */
-private[ui] class BlockManagerUI(parent: SparkUI) {
+private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
+ val appName = parent.appName
val basePath = parent.basePath
+ val listener = new StorageListener(parent.storageStatusListener)
- private val indexPage = new IndexPage(this)
- private val rddPage = new RDDPage(this)
- private var _listener: Option[BlockManagerListener] = None
-
- lazy val listener = _listener.get
-
- def appName = parent.appName
-
- def start() {
- _listener = Some(new BlockManagerListener(parent.storageStatusListener))
- }
-
- def getHandlers = Seq[ServletContextHandler](
- createServletHandler("/storage/rdd",
- (request: HttpServletRequest) => rddPage.render(request), parent.securityManager, basePath),
- createServletHandler("/storage",
- (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
- )
+ attachPage(new StoragePage(this))
+ attachPage(new RddPage(this))
+ parent.registerListener(listener)
}
/**
* A SparkListener that prepares information to be displayed on the BlockManagerUI
*/
-private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListener)
+private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
extends SparkListener {
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f2396f7c80..465835ea7f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -88,30 +88,27 @@ private[spark] object JsonProtocol {
def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
val taskInfo = taskStart.taskInfo
- val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
("Event" -> Utils.getFormattedClassName(taskStart)) ~
("Stage ID" -> taskStart.stageId) ~
- ("Task Info" -> taskInfoJson)
+ ("Task Info" -> taskInfoToJson(taskInfo))
}
def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
val taskInfo = taskGettingResult.taskInfo
- val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
- ("Task Info" -> taskInfoJson)
+ ("Task Info" -> taskInfoToJson(taskInfo))
}
def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
val taskEndReason = taskEndReasonToJson(taskEnd.reason)
val taskInfo = taskEnd.taskInfo
- val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing
val taskMetrics = taskEnd.taskMetrics
val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
("Event" -> Utils.getFormattedClassName(taskEnd)) ~
("Stage ID" -> taskEnd.stageId) ~
("Task Type" -> taskEnd.taskType) ~
("Task End Reason" -> taskEndReason) ~
- ("Task Info" -> taskInfoJson) ~
+ ("Task Info" -> taskInfoToJson(taskInfo)) ~
("Task Metrics" -> taskMetricsJson)
}
@@ -505,6 +502,9 @@ private[spark] object JsonProtocol {
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
+ if (json == JNothing) {
+ return TaskMetrics.empty
+ }
val metrics = new TaskMetrics
metrics.hostname = (json \ "Host Name").extract[String]
metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long]
diff --git a/core/src/test/scala/org/apache/spark/SparkUISuite.scala b/core/src/test/scala/org/apache/spark/SparkUISuite.scala
deleted file mode 100644
index d0d119c150..0000000000
--- a/core/src/test/scala/org/apache/spark/SparkUISuite.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.net.URI
-
-import org.scalatest.FunSuite
-
-class SparkUISuite extends FunSuite with SharedSparkContext {
-
- test("verify appUIAddress contains the scheme") {
- val uiAddress = sc.ui.appUIAddress
- assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
- }
-
- test("verify appUIAddress contains the port") {
- val splitUIAddress = sc.ui.appUIAddress.split(':')
- assert(splitUIAddress(2).toInt == sc.ui.boundPort)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 2f9739f940..b85c483ca2 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,16 +18,81 @@
package org.apache.spark.ui
import java.net.ServerSocket
+import javax.servlet.http.HttpServletRequest
+import scala.io.Source
import scala.util.{Failure, Success, Try}
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.ServletContextHandler
import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.LocalSparkContext._
+import scala.xml.Node
class UISuite extends FunSuite {
+
+ test("basic ui visibility") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ // test if the ui is visible, and all the expected tabs are visible
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(sc.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+ assert(html.toLowerCase.contains("stages"))
+ assert(html.toLowerCase.contains("storage"))
+ assert(html.toLowerCase.contains("environment"))
+ assert(html.toLowerCase.contains("executors"))
+ }
+ }
+ }
+
+ test("visibility at localhost:4040") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ // test if visible from http://localhost:4040
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL("http://localhost:4040").mkString
+ assert(html.toLowerCase.contains("stages"))
+ }
+ }
+ }
+
+ test("attaching a new tab") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val sparkUI = sc.ui
+
+ val newTab = new WebUITab(sparkUI, "foo") {
+ attachPage(new WebUIPage("") {
+ def render(request: HttpServletRequest): Seq[Node] = {
+ <b>"html magic"</b>
+ }
+ })
+ }
+ sparkUI.attachTab(newTab)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(sc.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+
+ // check whether new page exists
+ assert(html.toLowerCase.contains("foo"))
+
+ // check whether other pages still exist
+ assert(html.toLowerCase.contains("stages"))
+ assert(html.toLowerCase.contains("storage"))
+ assert(html.toLowerCase.contains("environment"))
+ assert(html.toLowerCase.contains("executors"))
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
+ // check whether new page exists
+ assert(html.contains("magic"))
+ }
+ }
+ }
+
test("jetty port increases under contention") {
val startPort = 4040
val server = new Server(startPort)
@@ -60,4 +125,18 @@ class UISuite extends FunSuite {
case Failure(e) =>
}
}
+
+ test("verify appUIAddress contains the scheme") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val uiAddress = sc.ui.appUIAddress
+ assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
+ }
+ }
+
+ test("verify appUIAddress contains the port") {
+ withSpark(new SparkContext("local", "test")) { sc =>
+ val splitUIAddress = sc.ui.appUIAddress.split(':')
+ assert(splitUIAddress(2).toInt == sc.ui.boundPort)
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f75297a02d..16470bb7bf 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -523,8 +523,8 @@ class JsonProtocolSuite extends FunSuite {
700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics":
{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks":
[{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status":
- {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
- "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
+ {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,
+ "Deserialized":false,"Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}}
"""
private val jobStartJsonString =