aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala214
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala274
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala1
-rw-r--r--docs/monitoring.md21
-rw-r--r--project/MimaExcludes.scala5
-rwxr-xr-xsbin/start-history-server.sh17
9 files changed, 448 insertions, 244 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
new file mode 100644
index 0000000000..a0e8bd403a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import org.apache.spark.ui.SparkUI
+
+private[spark] case class ApplicationHistoryInfo(
+ id: String,
+ name: String,
+ startTime: Long,
+ endTime: Long,
+ lastUpdated: Long,
+ sparkUser: String)
+
+private[spark] abstract class ApplicationHistoryProvider {
+
+ /**
+ * Returns a list of applications available for the history server to show.
+ *
+ * @return List of all know applications.
+ */
+ def getListing(): Seq[ApplicationHistoryInfo]
+
+ /**
+ * Returns the Spark UI for a specific application.
+ *
+ * @param appId The application ID.
+ * @return The application's UI, or null if application is not found.
+ */
+ def getAppUI(appId: String): SparkUI
+
+ /**
+ * Called when the server is shutting down.
+ */
+ def stop(): Unit = { }
+
+ /**
+ * Returns configuration data to be shown in the History Server home page.
+ *
+ * @return A map with the configuration data. Data is show in the order returned by the map.
+ */
+ def getConfig(): Map[String, String] = Map()
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
new file mode 100644
index 0000000000..a8c9ac0724
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.io.FileNotFoundException
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.scheduler._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
+
+private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
+ with Logging {
+
+ // Interval between each check for event log updates
+ private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
+ conf.getInt("spark.history.updateInterval", 10)) * 1000
+
+ private val logDir = conf.get("spark.history.fs.logDirectory", null)
+ if (logDir == null) {
+ throw new IllegalArgumentException("Logging directory must be specified.")
+ }
+
+ private val fs = Utils.getHadoopFileSystem(logDir)
+
+ // A timestamp of when the disk was last accessed to check for log updates
+ private var lastLogCheckTimeMs = -1L
+
+ // List of applications, in order from newest to oldest.
+ @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
+
+ /**
+ * A background thread that periodically checks for event log updates on disk.
+ *
+ * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
+ * time at which it performs the next log check to maintain the same period as before.
+ *
+ * TODO: Add a mechanism to update manually.
+ */
+ private val logCheckingThread = new Thread("LogCheckingThread") {
+ override def run() = Utils.logUncaughtExceptions {
+ while (true) {
+ val now = getMonotonicTimeMs()
+ if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
+ Thread.sleep(UPDATE_INTERVAL_MS)
+ } else {
+ // If the user has manually checked for logs recently, wait until
+ // UPDATE_INTERVAL_MS after the last check time
+ Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
+ }
+ checkForLogs()
+ }
+ }
+ }
+
+ initialize()
+
+ private def initialize() {
+ // Validate the log directory.
+ val path = new Path(logDir)
+ if (!fs.exists(path)) {
+ throw new IllegalArgumentException(
+ "Logging directory specified does not exist: %s".format(logDir))
+ }
+ if (!fs.getFileStatus(path).isDir) {
+ throw new IllegalArgumentException(
+ "Logging directory specified is not a directory: %s".format(logDir))
+ }
+
+ checkForLogs()
+ logCheckingThread.setDaemon(true)
+ logCheckingThread.start()
+ }
+
+ override def getListing() = appList
+
+ override def getAppUI(appId: String): SparkUI = {
+ try {
+ val appLogDir = fs.getFileStatus(new Path(logDir, appId))
+ loadAppInfo(appLogDir, true)._2
+ } catch {
+ case e: FileNotFoundException => null
+ }
+ }
+
+ override def getConfig(): Map[String, String] =
+ Map(("Event Log Location" -> logDir))
+
+ /**
+ * Builds the application list based on the current contents of the log directory.
+ * Tries to reuse as much of the data already in memory as possible, by not reading
+ * applications that haven't been updated since last time the logs were checked.
+ */
+ private def checkForLogs() = {
+ lastLogCheckTimeMs = getMonotonicTimeMs()
+ logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
+ try {
+ val logStatus = fs.listStatus(new Path(logDir))
+ val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
+ val logInfos = logDirs.filter {
+ dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
+ }
+
+ val currentApps = Map[String, ApplicationHistoryInfo](
+ appList.map(app => (app.id -> app)):_*)
+
+ // For any application that either (i) is not listed or (ii) has changed since the last time
+ // the listing was created (defined by the log dir's modification time), load the app's info.
+ // Otherwise just reuse what's already in memory.
+ val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
+ for (dir <- logInfos) {
+ val curr = currentApps.getOrElse(dir.getPath().getName(), null)
+ if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
+ try {
+ newApps += loadAppInfo(dir, false)._1
+ } catch {
+ case e: Exception => logError(s"Failed to load app info from directory $dir.")
+ }
+ } else {
+ newApps += curr
+ }
+ }
+
+ appList = newApps.sortBy { info => -info.endTime }
+ } catch {
+ case t: Throwable => logError("Exception in checking for event log updates", t)
+ }
+ }
+
+ /**
+ * Parse the application's logs to find out the information we need to build the
+ * listing page.
+ *
+ * When creating the listing of available apps, there is no need to load the whole UI for the
+ * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
+ * clicks on a specific application.
+ *
+ * @param logDir Directory with application's log files.
+ * @param renderUI Whether to create the SparkUI for the application.
+ * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
+ */
+ private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
+ val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
+ val path = logDir.getPath
+ val appId = path.getName
+ val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
+ val appListener = new ApplicationEventListener
+ replayBus.addListener(appListener)
+
+ val ui: SparkUI = if (renderUI) {
+ val conf = this.conf.clone()
+ val appSecManager = new SecurityManager(conf)
+ new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
+ // Do not call ui.bind() to avoid creating a new server for each application
+ } else {
+ null
+ }
+
+ replayBus.replay()
+ val appInfo = ApplicationHistoryInfo(
+ appId,
+ appListener.appName,
+ appListener.startTime,
+ appListener.endTime,
+ getModificationTime(logDir),
+ appListener.sparkUser)
+
+ if (ui != null) {
+ val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+ ui.getSecurityManager.setUIAcls(uiAclsEnabled)
+ ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
+ }
+ (appInfo, ui)
+ }
+
+ /** Return when this directory was last modified. */
+ private def getModificationTime(dir: FileStatus): Long = {
+ try {
+ val logFiles = fs.listStatus(dir.getPath)
+ if (logFiles != null && !logFiles.isEmpty) {
+ logFiles.map(_.getModificationTime).max
+ } else {
+ dir.getModificationTime
+ }
+ } catch {
+ case t: Throwable =>
+ logError("Exception in accessing modification time of %s".format(dir.getPath), t)
+ -1L
+ }
+ }
+
+ /** Returns the system's mononotically increasing time. */
+ private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 180c853ce3..a958c837c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -25,20 +25,36 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
+ private val pageSize = 20
+
def render(request: HttpServletRequest): Seq[Node] = {
- val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
- val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
+ val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
+ val requestedFirst = (requestedPage - 1) * pageSize
+
+ val allApps = parent.getApplicationList()
+ val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
+ val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
+
+ val actualPage = (actualFirst / pageSize) + 1
+ val last = Math.min(actualFirst + pageSize, allApps.size) - 1
+ val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+
+ val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+ val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
- <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
+ { providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
</ul>
{
- if (parent.appIdToInfo.size > 0) {
+ if (allApps.size > 0) {
<h4>
- Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
- Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+ Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+ <span style="float: right">
+ {if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
+ {if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
+ </span>
</h4> ++
appTable
} else {
@@ -56,26 +72,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
- "Log Directory",
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val appName = if (info.started) info.name else info.logDirPath.getName
- val uiAddress = parent.getAddress + info.ui.basePath
- val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
- val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
- val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
- val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
- val sparkUser = if (info.started) info.sparkUser else "Unknown user"
- val logDirectory = info.logDirPath.getName
+ val uiAddress = "/history/" + info.id
+ val startTime = UIUtils.formatDate(info.startTime)
+ val endTime = UIUtils.formatDate(info.endTime)
+ val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
- <td><a href={uiAddress}>{appName}</a></td>
+ <td><a href={uiAddress}>{info.name}</a></td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
- <td>{sparkUser}</td>
- <td>{logDirectory}</td>
+ <td>{info.sparkUser}</td>
<td>{lastUpdated}</td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a9c11dca56..29a78a56c8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,14 +17,15 @@
package org.apache.spark.deploy.history
-import scala.collection.mutable
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.cache._
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler._
-import org.apache.spark.ui.{WebUI, SparkUI}
+import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -38,56 +39,68 @@ import org.apache.spark.util.Utils
* application's event logs are maintained in the application's own sub-directory. This
* is the same structure as maintained in the event log write code path in
* EventLoggingListener.
- *
- * @param baseLogDir The base directory in which event logs are found
*/
class HistoryServer(
- val baseLogDir: String,
+ conf: SparkConf,
+ provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
- conf: SparkConf)
- extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
-
- import HistoryServer._
+ port: Int)
+ extends WebUI(securityManager, port, conf) with Logging {
- private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
- private val localHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+ // How many applications to retain
+ private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
- // A timestamp of when the disk was last accessed to check for log updates
- private var lastLogCheckTime = -1L
+ private val appLoader = new CacheLoader[String, SparkUI] {
+ override def load(key: String): SparkUI = {
+ val ui = provider.getAppUI(key)
+ if (ui == null) {
+ throw new NoSuchElementException()
+ }
+ attachSparkUI(ui)
+ ui
+ }
+ }
- // Number of completed applications found in this directory
- private var numCompletedApplications = 0
+ private val appCache = CacheBuilder.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(new RemovalListener[String, SparkUI] {
+ override def onRemoval(rm: RemovalNotification[String, SparkUI]) = {
+ detachSparkUI(rm.getValue())
+ }
+ })
+ .build(appLoader)
+
+ private val loaderServlet = new HttpServlet {
+ protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ val parts = Option(req.getPathInfo()).getOrElse("").split("/")
+ if (parts.length < 2) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ s"Unexpected path info in request (URI = ${req.getRequestURI()}")
+ return
+ }
- @volatile private var stopped = false
+ val appId = parts(1)
- /**
- * A background thread that periodically checks for event log updates on disk.
- *
- * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
- * time at which it performs the next log check to maintain the same period as before.
- *
- * TODO: Add a mechanism to update manually.
- */
- private val logCheckingThread = new Thread {
- override def run(): Unit = Utils.logUncaughtExceptions {
- while (!stopped) {
- val now = System.currentTimeMillis
- if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
- checkForLogs()
- Thread.sleep(UPDATE_INTERVAL_MS)
- } else {
- // If the user has manually checked for logs recently, wait until
- // UPDATE_INTERVAL_MS after the last check time
- Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+ // Note we don't use the UI retrieved from the cache; the cache loader above will register
+ // the app's UI, and all we need to do is redirect the user to the same URI that was
+ // requested, and the proper data should be served at that point.
+ try {
+ appCache.get(appId)
+ res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
+ } catch {
+ case e: Exception => e.getCause() match {
+ case nsee: NoSuchElementException =>
+ val msg = <div class="row-fluid">Application {appId} not found.</div>
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND)
+ UIUtils.basicSparkPage(msg, "Not Found").foreach(
+ n => res.getWriter().write(n.toString))
+
+ case cause: Exception => throw cause
}
}
}
}
- // A mapping of application ID to its history information, which includes the rendered UI
- val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
-
initialize()
/**
@@ -98,108 +111,23 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))
- attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+
+ val contextHandler = new ServletContextHandler
+ contextHandler.setContextPath("/history")
+ contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
+ attachHandler(contextHandler)
}
/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
- logCheckingThread.start()
- }
-
- /**
- * Check for any updates to event logs in the base directory. This is only effective once
- * the server has been bound.
- *
- * If a new completed application is found, the server renders the associated SparkUI
- * from the application's event logs, attaches this UI to itself, and stores metadata
- * information for this application.
- *
- * If the logs for an existing completed application are no longer found, the server
- * removes all associated information and detaches the SparkUI.
- */
- def checkForLogs() = synchronized {
- if (serverInfo.isDefined) {
- lastLogCheckTime = System.currentTimeMillis
- logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
- try {
- val logStatus = fileSystem.listStatus(new Path(baseLogDir))
- val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs
- .sortBy { dir => getModificationTime(dir) }
- .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
- .filter { case (dir, info) => info.applicationComplete }
-
- // Logging information for applications that should be retained
- val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
- val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
-
- // Remove any applications that should no longer be retained
- appIdToInfo.foreach { case (appId, info) =>
- if (!retainedAppIds.contains(appId)) {
- detachSparkUI(info.ui)
- appIdToInfo.remove(appId)
- }
- }
-
- // Render the application's UI if it is not already there
- retainedLogInfos.foreach { case (dir, info) =>
- val appId = dir.getPath.getName
- if (!appIdToInfo.contains(appId)) {
- renderSparkUI(dir, info)
- }
- }
-
- // Track the total number of completed applications observed this round
- numCompletedApplications = logInfos.size
-
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
- }
- } else {
- logWarning("Attempted to check for event log updates before binding the server.")
- }
- }
-
- /**
- * Render a new SparkUI from the event logs if the associated application is completed.
- *
- * HistoryServer looks for a special file that indicates application completion in the given
- * directory. If this file exists, the associated application is regarded to be completed, in
- * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
- */
- private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) {
- val path = logDir.getPath
- val appId = path.getName
- val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec)
- val appListener = new ApplicationEventListener
- replayBus.addListener(appListener)
- val appConf = conf.clone()
- val appSecManager = new SecurityManager(appConf)
- val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
-
- // Do not call ui.bind() to avoid creating a new server for each application
- replayBus.replay()
- if (appListener.applicationStarted) {
- appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED)
- appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- attachSparkUI(ui)
- val appName = appListener.appName
- val sparkUser = appListener.sparkUser
- val startTime = appListener.startTime
- val endTime = appListener.endTime
- val lastUpdated = getModificationTime(logDir)
- ui.setAppName(appName + " (completed)")
- appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
- lastUpdated, sparkUser, path, ui)
- }
}
/** Stop the server and close the file system. */
override def stop() {
super.stop()
- stopped = true
- fileSystem.close()
+ provider.stop()
}
/** Attach a reconstructed UI to this server. Only valid after bind(). */
@@ -215,27 +143,20 @@ class HistoryServer(
ui.getHandlers.foreach(detachHandler)
}
- /** Return the address of this server. */
- def getAddress: String = "http://" + publicHost + ":" + boundPort
+ /**
+ * Returns a list of available applications, in descending order according to their end time.
+ *
+ * @return List of all known applications.
+ */
+ def getApplicationList() = provider.getListing()
- /** Return the number of completed applications found, whether or not the UI is rendered. */
- def getNumApplications: Int = numCompletedApplications
+ /**
+ * Returns the provider configuration to show in the listing page.
+ *
+ * @return A map with the provider's configuration.
+ */
+ def getProviderConfig() = provider.getConfig()
- /** Return when this directory was last modified. */
- private def getModificationTime(dir: FileStatus): Long = {
- try {
- val logFiles = fileSystem.listStatus(dir.getPath)
- if (logFiles != null && !logFiles.isEmpty) {
- logFiles.map(_.getModificationTime).max
- } else {
- dir.getModificationTime
- }
- } catch {
- case e: Exception =>
- logError("Exception in accessing modification time of %s".format(dir.getPath), e)
- -1L
- }
- }
}
/**
@@ -251,30 +172,31 @@ class HistoryServer(
object HistoryServer {
private val conf = new SparkConf
- // Interval between each check for event log updates
- val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
-
- // How many applications to retain
- val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
-
- // The port to which the web UI is bound
- val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
-
- // set whether to enable or disable view acls for all applications
- val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false)
-
- val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
-
def main(argStrings: Array[String]) {
initSecurity()
- val args = new HistoryServerArguments(argStrings)
+ val args = new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
- val server = new HistoryServer(args.logDir, securityManager, conf)
+
+ val providerName = conf.getOption("spark.history.provider")
+ .getOrElse(classOf[FsHistoryProvider].getName())
+ val provider = Class.forName(providerName)
+ .getConstructor(classOf[SparkConf])
+ .newInstance(conf)
+ .asInstanceOf[ApplicationHistoryProvider]
+
+ val port = conf.getInt("spark.history.ui.port", 18080)
+
+ val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
+ Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
+ override def run() = {
+ server.stop()
+ }
+ })
+
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
- server.stop()
}
def initSecurity() {
@@ -291,17 +213,3 @@ object HistoryServer {
}
}
-
-
-private[spark] case class ApplicationHistoryInfo(
- id: String,
- name: String,
- startTime: Long,
- endTime: Long,
- lastUpdated: Long,
- sparkUser: String,
- logDirPath: Path,
- ui: SparkUI) {
- def started = startTime != -1
- def completed = endTime != -1
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 943c061743..be9361b754 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -17,17 +17,14 @@
package org.apache.spark.deploy.history
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-
+import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
-private[spark] class HistoryServerArguments(args: Array[String]) {
- var logDir = ""
+private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
+ private var logDir: String = null
parse(args.toList)
@@ -45,32 +42,36 @@ private[spark] class HistoryServerArguments(args: Array[String]) {
case _ =>
printUsageAndExit(1)
}
- validateLogDir()
- }
-
- private def validateLogDir() {
- if (logDir == "") {
- System.err.println("Logging directory must be specified.")
- printUsageAndExit(1)
- }
- val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- val path = new Path(logDir)
- if (!fileSystem.exists(path)) {
- System.err.println("Logging directory specified does not exist: %s".format(logDir))
- printUsageAndExit(1)
- }
- if (!fileSystem.getFileStatus(path).isDir) {
- System.err.println("Logging directory specified is not a directory: %s".format(logDir))
- printUsageAndExit(1)
+ if (logDir != null) {
+ conf.set("spark.history.fs.logDirectory", logDir)
}
}
private def printUsageAndExit(exitCode: Int) {
System.err.println(
- "Usage: HistoryServer [options]\n" +
- "\n" +
- "Options:\n" +
- " -d DIR, --dir DIR Location of event log files")
+ """
+ |Usage: HistoryServer
+ |
+ |Configuration options can be set by setting the corresponding JVM system property.
+ |History Server options are always available; additional options depend on the provider.
+ |
+ |History Server options:
+ |
+ | spark.history.ui.port Port where server will listen for connections
+ | (default 18080)
+ | spark.history.acls.enable Whether to enable view acls for all applications
+ | (default false)
+ | spark.history.provider Name of history provider class (defaults to
+ | file system-based provider)
+ | spark.history.retainedApplications Max number of application UIs to keep loaded in memory
+ | (default 50)
+ |FsHistoryProvider options:
+ |
+ | spark.history.fs.logDirectory Directory where app logs are stored (required)
+ | spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
+ | default 10)
+ |""".stripMargin)
System.exit(exitCode)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index b08f308fda..856273e1d4 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -51,6 +51,7 @@ private[spark] abstract class WebUI(
def getTabs: Seq[WebUITab] = tabs.toSeq
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+ def getSecurityManager: SecurityManager = securityManager
/** Attach a tab to this UI, along with all of its attached pages. */
def attachTab(tab: WebUITab) {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 2b9e9e5bd7..84073fe4d9 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -35,11 +35,13 @@ If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of
application through Spark's history server, provided that the application's event logs exist.
You can start a the history server by executing:
- ./sbin/start-history-server.sh <base-logging-directory>
+ ./sbin/start-history-server.sh
-The base logging directory must be supplied, and should contain sub-directories that each
-represents an application's event logs. This creates a web interface at
-`http://<server-url>:18080` by default. The history server can be configured as follows:
+When using the file-system provider class (see spark.history.provider below), the base logging
+directory must be supplied in the <code>spark.history.fs.logDirectory</code> configuration option,
+and should contain sub-directories that each represents an application's event logs. This creates a
+web interface at `http://<server-url>:18080` by default. The history server can be configured as
+follows:
<table class="table">
<tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr>
@@ -69,7 +71,14 @@ represents an application's event logs. This creates a web interface at
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
- <td>spark.history.updateInterval</td>
+ <td>spark.history.provider</td>
+ <td>org.apache.spark.deploy.history.FsHistoryProvider</td>
+ <td>Name of the class implementing the application history backend. Currently there is only
+ one implementation, provided by Spark, which looks for application logs stored in the
+ file system.</td>
+ </tr>
+ <tr>
+ <td>spark.history.fs.updateInterval</td>
<td>10</td>
<td>
The period, in seconds, at which information displayed by this history server is updated.
@@ -78,7 +87,7 @@ represents an application's event logs. This creates a web interface at
</tr>
<tr>
<td>spark.history.retainedApplications</td>
- <td>250</td>
+ <td>50</td>
<td>
The number of application UIs to retain. If this cap is exceeded, then the oldest
applications will be removed.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index af620d6160..1621833e12 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -34,7 +34,10 @@ object MimaExcludes {
val excludes =
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.1") =>
- Seq(MimaBuild.excludeSparkPackage("graphx")) ++
+ Seq(
+ MimaBuild.excludeSparkPackage("deploy"),
+ MimaBuild.excludeSparkPackage("graphx")
+ ) ++
Seq(
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh
index 4a90c68763..e30493da32 100755
--- a/sbin/start-history-server.sh
+++ b/sbin/start-history-server.sh
@@ -19,19 +19,18 @@
# Starts the history server on the machine this script is executed on.
#
-# Usage: start-history-server.sh <base-log-dir> [<web-ui-port>]
-# Example: ./start-history-server.sh --dir /tmp/spark-events --port 18080
+# Usage: start-history-server.sh
+#
+# Use the SPARK_HISTORY_OPTS environment variable to set history server configuration.
#
sbin=`dirname "$0"`
sbin=`cd "$sbin"; pwd`
-if [ $# -lt 1 ]; then
- echo "Usage: ./start-history-server.sh <base-log-dir>"
- echo "Example: ./start-history-server.sh /tmp/spark-events"
- exit
+if [ $# != 0 ]; then
+ echo "Using command line arguments for setting the log directory is deprecated. Please "
+ echo "set the spark.history.fs.logDirectory configuration option instead."
+ export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=$1"
fi
-LOG_DIR=$1
-
-"$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 --dir "$LOG_DIR"
+exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1