aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala214
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala274
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala1
6 files changed, 421 insertions, 228 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
new file mode 100644
index 0000000000..a0e8bd403a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import org.apache.spark.ui.SparkUI
+
+private[spark] case class ApplicationHistoryInfo(
+ id: String,
+ name: String,
+ startTime: Long,
+ endTime: Long,
+ lastUpdated: Long,
+ sparkUser: String)
+
+private[spark] abstract class ApplicationHistoryProvider {
+
+ /**
+ * Returns a list of applications available for the history server to show.
+ *
+ * @return List of all know applications.
+ */
+ def getListing(): Seq[ApplicationHistoryInfo]
+
+ /**
+ * Returns the Spark UI for a specific application.
+ *
+ * @param appId The application ID.
+ * @return The application's UI, or null if application is not found.
+ */
+ def getAppUI(appId: String): SparkUI
+
+ /**
+ * Called when the server is shutting down.
+ */
+ def stop(): Unit = { }
+
+ /**
+ * Returns configuration data to be shown in the History Server home page.
+ *
+ * @return A map with the configuration data. Data is show in the order returned by the map.
+ */
+ def getConfig(): Map[String, String] = Map()
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
new file mode 100644
index 0000000000..a8c9ac0724
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.scheduler._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+
+private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
+ with Logging {
+
+ // Interval between each check for event log updates
+ private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
+ conf.getInt("spark.history.updateInterval", 10)) * 1000
+
+ private val logDir = conf.get("spark.history.fs.logDirectory", null)
+ if (logDir == null) {
+ throw new IllegalArgumentException("Logging directory must be specified.")
+ }
+
+ private val fs = Utils.getHadoopFileSystem(logDir)
+
+ // A timestamp of when the disk was last accessed to check for log updates
+ private var lastLogCheckTimeMs = -1L
+
+ // List of applications, in order from newest to oldest.
+ @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
+
+ /**
+ * A background thread that periodically checks for event log updates on disk.
+ *
+ * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
+ * time at which it performs the next log check to maintain the same period as before.
+ *
+ * TODO: Add a mechanism to update manually.
+ */
+ private val logCheckingThread = new Thread("LogCheckingThread") {
+ override def run() = Utils.logUncaughtExceptions {
+ while (true) {
+ val now = getMonotonicTimeMs()
+ if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
+ Thread.sleep(UPDATE_INTERVAL_MS)
+ } else {
+ // If the user has manually checked for logs recently, wait until
+ // UPDATE_INTERVAL_MS after the last check time
+ Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
+ }
+ checkForLogs()
+ }
+ }
+ }
+
+ initialize()
+
+ private def initialize() {
+ // Validate the log directory.
+ val path = new Path(logDir)
+ if (!fs.exists(path)) {
+ throw new IllegalArgumentException(
+ "Logging directory specified does not exist: %s".format(logDir))
+ }
+ if (!fs.getFileStatus(path).isDir) {
+ throw new IllegalArgumentException(
+ "Logging directory specified is not a directory: %s".format(logDir))
+ }
+
+ checkForLogs()
+ logCheckingThread.setDaemon(true)
+ logCheckingThread.start()
+ }
+
+ override def getListing() = appList
+
+ override def getAppUI(appId: String): SparkUI = {
+ try {
+ val appLogDir = fs.getFileStatus(new Path(logDir, appId))
+ loadAppInfo(appLogDir, true)._2
+ } catch {
+ case e: FileNotFoundException => null
+ }
+ }
+
+ override def getConfig(): Map[String, String] =
+ Map(("Event Log Location" -> logDir))
+
+ /**
+ * Builds the application list based on the current contents of the log directory.
+ * Tries to reuse as much of the data already in memory as possible, by not reading
+ * applications that haven't been updated since last time the logs were checked.
+ */
+ private def checkForLogs() = {
+ lastLogCheckTimeMs = getMonotonicTimeMs()
+ logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
+ try {
+ val logStatus = fs.listStatus(new Path(logDir))
+ val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
+ val logInfos = logDirs.filter {
+ dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
+ }
+
+ val currentApps = Map[String, ApplicationHistoryInfo](
+ appList.map(app => (app.id -> app)):_*)
+
+ // For any application that either (i) is not listed or (ii) has changed since the last time
+ // the listing was created (defined by the log dir's modification time), load the app's info.
+ // Otherwise just reuse what's already in memory.
+ val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
+ for (dir <- logInfos) {
+ val curr = currentApps.getOrElse(dir.getPath().getName(), null)
+ if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
+ try {
+ newApps += loadAppInfo(dir, false)._1
+ } catch {
+ case e: Exception => logError(s"Failed to load app info from directory $dir.")
+ }
+ } else {
+ newApps += curr
+ }
+ }
+
+ appList = newApps.sortBy { info => -info.endTime }
+ } catch {
+ case t: Throwable => logError("Exception in checking for event log updates", t)
+ }
+ }
+
+ /**
+ * Parse the application's logs to find out the information we need to build the
+ * listing page.
+ *
+ * When creating the listing of available apps, there is no need to load the whole UI for the
+ * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
+ * clicks on a specific application.
+ *
+ * @param logDir Directory with application's log files.
+ * @param renderUI Whether to create the SparkUI for the application.
+ * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
+ */
+ private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
+ val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
+ val path = logDir.getPath
+ val appId = path.getName
+ val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
+ val appListener = new ApplicationEventListener
+ replayBus.addListener(appListener)
+
+ val ui: SparkUI = if (renderUI) {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
+ // Do not call ui.bind() to avoid creating a new server for each application
+ } else {
+ null
+ }
+
+ replayBus.replay()
+ val appInfo = ApplicationHistoryInfo(
+ appId,
+ appListener.appName,
+ appListener.startTime,
+ appListener.endTime,
+ getModificationTime(logDir),
+ appListener.sparkUser)
+
+ if (ui != null) {
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setUIAcls(uiAclsEnabled)
+ ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
+ }
+ (appInfo, ui)
+ }
+
+ /** Return when this directory was last modified. */
+ private def getModificationTime(dir: FileStatus): Long = {
+ try {
+ val logFiles = fs.listStatus(dir.getPath)
+ if (logFiles != null && !logFiles.isEmpty) {
+ logFiles.map(_.getModificationTime).max
+ } else {
+ dir.getModificationTime
+ }
+ } catch {
+ case t: Throwable =>
+ logError("Exception in accessing modification time of %s".format(dir.getPath), t)
+ -1L
+ }
+ }
+
+ /** Returns the system's mononotically increasing time. */
+ private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 180c853ce3..a958c837c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -25,20 +25,36 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
+ private val pageSize = 20
+
def render(request: HttpServletRequest): Seq[Node] = {
- val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
- val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
+ val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
+ val requestedFirst = (requestedPage - 1) * pageSize
+
+ val allApps = parent.getApplicationList()
+ val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
+ val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
+
+ val actualPage = (actualFirst / pageSize) + 1
+ val last = Math.min(actualFirst + pageSize, allApps.size) - 1
+ val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+
+ val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+ val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
- <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
+ { providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
</ul>
{
- if (parent.appIdToInfo.size > 0) {
+ if (allApps.size > 0) {
<h4>
- Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
- Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+ Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+ <span style="float: right">
+ {if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
+ {if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
+ </span>
</h4> ++
appTable
} else {
@@ -56,26 +72,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
- "Log Directory",
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val appName = if (info.started) info.name else info.logDirPath.getName
- val uiAddress = parent.getAddress + info.ui.basePath
- val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
- val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
- val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
- val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
- val sparkUser = if (info.started) info.sparkUser else "Unknown user"
- val logDirectory = info.logDirPath.getName
+ val uiAddress = "/history/" + info.id
+ val startTime = UIUtils.formatDate(info.startTime)
+ val endTime = UIUtils.formatDate(info.endTime)
+ val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
- <td><a href={uiAddress}>{appName}</a></td>
+ <td><a href={uiAddress}>{info.name}</a></td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
- <td>{sparkUser}</td>
- <td>{logDirectory}</td>
+ <td>{info.sparkUser}</td>
<td>{lastUpdated}</td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a9c11dca56..29a78a56c8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,14 +17,15 @@
package org.apache.spark.deploy.history
-import scala.collection.mutable
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.cache._
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler._
-import org.apache.spark.ui.{WebUI, SparkUI}
+import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -38,56 +39,68 @@ import org.apache.spark.util.Utils
* application's event logs are maintained in the application's own sub-directory. This
* is the same structure as maintained in the event log write code path in
* EventLoggingListener.
- *
- * @param baseLogDir The base directory in which event logs are found
*/
class HistoryServer(
- val baseLogDir: String,
+ conf: SparkConf,
+ provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
- conf: SparkConf)
- extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
-
- import HistoryServer._
+ port: Int)
+ extends WebUI(securityManager, port, conf) with Logging {
- private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
- private val localHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+ // How many applications to retain
+ private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
- // A timestamp of when the disk was last accessed to check for log updates
- private var lastLogCheckTime = -1L
+ private val appLoader = new CacheLoader[String, SparkUI] {
+ override def load(key: String): SparkUI = {
+ val ui = provider.getAppUI(key)
+ if (ui == null) {
+ throw new NoSuchElementException()
+ }
+ attachSparkUI(ui)
+ ui
+ }
+ }
- // Number of completed applications found in this directory
- private var numCompletedApplications = 0
+ private val appCache = CacheBuilder.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(new RemovalListener[String, SparkUI] {
+ override def onRemoval(rm: RemovalNotification[String, SparkUI]) = {
+ detachSparkUI(rm.getValue())
+ }
+ })
+ .build(appLoader)
+
+ private val loaderServlet = new HttpServlet {
+ protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ val parts = Option(req.getPathInfo()).getOrElse("").split("/")
+ if (parts.length < 2) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ s"Unexpected path info in request (URI = ${req.getRequestURI()}")
+ return
+ }
- @volatile private var stopped = false
+ val appId = parts(1)
- /**
- * A background thread that periodically checks for event log updates on disk.
- *
- * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
- * time at which it performs the next log check to maintain the same period as before.
- *
- * TODO: Add a mechanism to update manually.
- */
- private val logCheckingThread = new Thread {
- override def run(): Unit = Utils.logUncaughtExceptions {
- while (!stopped) {
- val now = System.currentTimeMillis
- if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
- checkForLogs()
- Thread.sleep(UPDATE_INTERVAL_MS)
- } else {
- // If the user has manually checked for logs recently, wait until
- // UPDATE_INTERVAL_MS after the last check time
- Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+ // Note we don't use the UI retrieved from the cache; the cache loader above will register
+ // the app's UI, and all we need to do is redirect the user to the same URI that was
+ // requested, and the proper data should be served at that point.
+ try {
+ appCache.get(appId)
+ res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
+ } catch {
+ case e: Exception => e.getCause() match {
+ case nsee: NoSuchElementException =>
+ val msg = <div class="row-fluid">Application {appId} not found.</div>
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND)
+ UIUtils.basicSparkPage(msg, "Not Found").foreach(
+ n => res.getWriter().write(n.toString))
+
+ case cause: Exception => throw cause
}
}
}
}
- // A mapping of application ID to its history information, which includes the rendered UI
- val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
-
initialize()
/**
@@ -98,108 +111,23 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))
- attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+
+ val contextHandler = new ServletContextHandler
+ contextHandler.setContextPath("/history")
+ contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
+ attachHandler(contextHandler)
}
/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
- logCheckingThread.start()
- }
-
- /**
- * Check for any updates to event logs in the base directory. This is only effective once
- * the server has been bound.
- *
- * If a new completed application is found, the server renders the associated SparkUI
- * from the application's event logs, attaches this UI to itself, and stores metadata
- * information for this application.
- *
- * If the logs for an existing completed application are no longer found, the server
- * removes all associated information and detaches the SparkUI.
- */
- def checkForLogs() = synchronized {
- if (serverInfo.isDefined) {
- lastLogCheckTime = System.currentTimeMillis
- logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
- try {
- val logStatus = fileSystem.listStatus(new Path(baseLogDir))
- val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs
- .sortBy { dir => getModificationTime(dir) }
- .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
- .filter { case (dir, info) => info.applicationComplete }
-
- // Logging information for applications that should be retained
- val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
- val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
-
- // Remove any applications that should no longer be retained
- appIdToInfo.foreach { case (appId, info) =>
- if (!retainedAppIds.contains(appId)) {
- detachSparkUI(info.ui)
- appIdToInfo.remove(appId)
- }
- }
-
- // Render the application's UI if it is not already there
- retainedLogInfos.foreach { case (dir, info) =>
- val appId = dir.getPath.getName
- if (!appIdToInfo.contains(appId)) {
- renderSparkUI(dir, info)
- }
- }
-
- // Track the total number of completed applications observed this round
- numCompletedApplications = logInfos.size
-
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
- }
- } else {
- logWarning("Attempted to check for event log updates before binding the server.")
- }
- }
-
- /**
- * Render a new SparkUI from the event logs if the associated application is completed.
- *
- * HistoryServer looks for a special file that indicates application completion in the given
- * directory. If this file exists, the associated application is regarded to be completed, in
- * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
- */
- private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) {
- val path = logDir.getPath
- val appId = path.getName
- val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec)
- val appListener = new ApplicationEventListener
- replayBus.addListener(appListener)
- val appConf = conf.clone()
- val appSecManager = new SecurityManager(appConf)
- val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
-
- // Do not call ui.bind() to avoid creating a new server for each application
- replayBus.replay()
- if (appListener.applicationStarted) {
- appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED)
- appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- attachSparkUI(ui)
- val appName = appListener.appName
- val sparkUser = appListener.sparkUser
- val startTime = appListener.startTime
- val endTime = appListener.endTime
- val lastUpdated = getModificationTime(logDir)
- ui.setAppName(appName + " (completed)")
- appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
- lastUpdated, sparkUser, path, ui)
- }
}
/** Stop the server and close the file system. */
override def stop() {
super.stop()
- stopped = true
- fileSystem.close()
+ provider.stop()
}
/** Attach a reconstructed UI to this server. Only valid after bind(). */
@@ -215,27 +143,20 @@ class HistoryServer(
ui.getHandlers.foreach(detachHandler)
}
- /** Return the address of this server. */
- def getAddress: String = "http://" + publicHost + ":" + boundPort
+ /**
+ * Returns a list of available applications, in descending order according to their end time.
+ *
+ * @return List of all known applications.
+ */
+ def getApplicationList() = provider.getListing()
- /** Return the number of completed applications found, whether or not the UI is rendered. */
- def getNumApplications: Int = numCompletedApplications
+ /**
+ * Returns the provider configuration to show in the listing page.
+ *
+ * @return A map with the provider's configuration.
+ */
+ def getProviderConfig() = provider.getConfig()
- /** Return when this directory was last modified. */
- private def getModificationTime(dir: FileStatus): Long = {
- try {
- val logFiles = fileSystem.listStatus(dir.getPath)
- if (logFiles != null && !logFiles.isEmpty) {
- logFiles.map(_.getModificationTime).max
- } else {
- dir.getModificationTime
- }
- } catch {
- case e: Exception =>
- logError("Exception in accessing modification time of %s".format(dir.getPath), e)
- -1L
- }
- }
}
/**
@@ -251,30 +172,31 @@ class HistoryServer(
object HistoryServer {
private val conf = new SparkConf
- // Interval between each check for event log updates
- val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
-
- // How many applications to retain
- val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
-
- // The port to which the web UI is bound
- val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
-
- // set whether to enable or disable view acls for all applications
- val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false)
-
- val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
-
def main(argStrings: Array[String]) {
initSecurity()
- val args = new HistoryServerArguments(argStrings)
+ val args = new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
- val server = new HistoryServer(args.logDir, securityManager, conf)
+
+ val providerName = conf.getOption("spark.history.provider")
+ .getOrElse(classOf[FsHistoryProvider].getName())
+ val provider = Class.forName(providerName)
+ .getConstructor(classOf[SparkConf])
+ .newInstance(conf)
+ .asInstanceOf[ApplicationHistoryProvider]
+
+ val port = conf.getInt("spark.history.ui.port", 18080)
+
+ val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
+ Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
+ override def run() = {
+ server.stop()
+ }
+ })
+
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
- server.stop()
}
def initSecurity() {
@@ -291,17 +213,3 @@ object HistoryServer {
}
}
-
-
-private[spark] case class ApplicationHistoryInfo(
- id: String,
- name: String,
- startTime: Long,
- endTime: Long,
- lastUpdated: Long,
- sparkUser: String,
- logDirPath: Path,
- ui: SparkUI) {
- def started = startTime != -1
- def completed = endTime != -1
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 943c061743..be9361b754 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -17,17 +17,14 @@
package org.apache.spark.deploy.history
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-
+import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
-private[spark] class HistoryServerArguments(args: Array[String]) {
- var logDir = ""
+private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
+ private var logDir: String = null
parse(args.toList)
@@ -45,32 +42,36 @@ private[spark] class HistoryServerArguments(args: Array[String]) {
case _ =>
printUsageAndExit(1)
}
- validateLogDir()
- }
-
- private def validateLogDir() {
- if (logDir == "") {
- System.err.println("Logging directory must be specified.")
- printUsageAndExit(1)
- }
- val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- val path = new Path(logDir)
- if (!fileSystem.exists(path)) {
- System.err.println("Logging directory specified does not exist: %s".format(logDir))
- printUsageAndExit(1)
- }
- if (!fileSystem.getFileStatus(path).isDir) {
- System.err.println("Logging directory specified is not a directory: %s".format(logDir))
- printUsageAndExit(1)
+ if (logDir != null) {
+ conf.set("spark.history.fs.logDirectory", logDir)
}
}
private def printUsageAndExit(exitCode: Int) {
System.err.println(
- "Usage: HistoryServer [options]\n" +
- "\n" +
- "Options:\n" +
- " -d DIR, --dir DIR Location of event log files")
+ """
+ |Usage: HistoryServer
+ |
+ |Configuration options can be set by setting the corresponding JVM system property.
+ |History Server options are always available; additional options depend on the provider.
+ |
+ |History Server options:
+ |
+ | spark.history.ui.port Port where server will listen for connections
+ | (default 18080)
+ | spark.history.acls.enable Whether to enable view acls for all applications
+ | (default false)
+ | spark.history.provider Name of history provider class (defaults to
+ | file system-based provider)
+ | spark.history.retainedApplications Max number of application UIs to keep loaded in memory
+ | (default 50)
+ |FsHistoryProvider options:
+ |
+ | spark.history.fs.logDirectory Directory where app logs are stored (required)
+ | spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
+ | default 10)
+ |""".stripMargin)
System.exit(exitCode)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index b08f308fda..856273e1d4 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -51,6 +51,7 @@ private[spark] abstract class WebUI(
def getTabs: Seq[WebUITab] = tabs.toSeq
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+ def getSecurityManager: SecurityManager = securityManager
/** Attach a tab to this UI, along with all of its attached pages. */
def attachTab(tab: WebUITab) {