diff options
Diffstat (limited to 'core/src/main/scala/org')
6 files changed, 421 insertions, 228 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala new file mode 100644 index 0000000000..a0e8bd403a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.ui.SparkUI + +private[spark] case class ApplicationHistoryInfo( + id: String, + name: String, + startTime: Long, + endTime: Long, + lastUpdated: Long, + sparkUser: String) + +private[spark] abstract class ApplicationHistoryProvider { + + /** + * Returns a list of applications available for the history server to show. + * + * @return List of all know applications. + */ + def getListing(): Seq[ApplicationHistoryInfo] + + /** + * Returns the Spark UI for a specific application. + * + * @param appId The application ID. + * @return The application's UI, or null if application is not found. + */ + def getAppUI(appId: String): SparkUI + + /** + * Called when the server is shutting down. + */ + def stop(): Unit = { } + + /** + * Returns configuration data to be shown in the History Server home page. + * + * @return A map with the configuration data. Data is show in the order returned by the map. + */ + def getConfig(): Map[String, String] = Map() + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala new file mode 100644 index 0000000000..a8c9ac0724 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.FileNotFoundException + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.scheduler._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils + +private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider + with Logging { + + // Interval between each check for event log updates + private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", + conf.getInt("spark.history.updateInterval", 10)) * 1000 + + private val logDir = conf.get("spark.history.fs.logDirectory", null) + if (logDir == null) { + throw new IllegalArgumentException("Logging directory must be specified.") + } + + private val fs = Utils.getHadoopFileSystem(logDir) + + // A timestamp of when the disk was last accessed to check for log updates + private var lastLogCheckTimeMs = -1L + + // List of applications, in order from newest to oldest. + @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil + + /** + * A background thread that periodically checks for event log updates on disk. + * + * If a log check is invoked manually in the middle of a period, this thread re-adjusts the + * time at which it performs the next log check to maintain the same period as before. + * + * TODO: Add a mechanism to update manually. + */ + private val logCheckingThread = new Thread("LogCheckingThread") { + override def run() = Utils.logUncaughtExceptions { + while (true) { + val now = getMonotonicTimeMs() + if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) { + Thread.sleep(UPDATE_INTERVAL_MS) + } else { + // If the user has manually checked for logs recently, wait until + // UPDATE_INTERVAL_MS after the last check time + Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) + } + checkForLogs() + } + } + } + + initialize() + + private def initialize() { + // Validate the log directory. + val path = new Path(logDir) + if (!fs.exists(path)) { + throw new IllegalArgumentException( + "Logging directory specified does not exist: %s".format(logDir)) + } + if (!fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException( + "Logging directory specified is not a directory: %s".format(logDir)) + } + + checkForLogs() + logCheckingThread.setDaemon(true) + logCheckingThread.start() + } + + override def getListing() = appList + + override def getAppUI(appId: String): SparkUI = { + try { + val appLogDir = fs.getFileStatus(new Path(logDir, appId)) + loadAppInfo(appLogDir, true)._2 + } catch { + case e: FileNotFoundException => null + } + } + + override def getConfig(): Map[String, String] = + Map(("Event Log Location" -> logDir)) + + /** + * Builds the application list based on the current contents of the log directory. + * Tries to reuse as much of the data already in memory as possible, by not reading + * applications that haven't been updated since last time the logs were checked. + */ + private def checkForLogs() = { + lastLogCheckTimeMs = getMonotonicTimeMs() + logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) + try { + val logStatus = fs.listStatus(new Path(logDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val logInfos = logDirs.filter { + dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) + } + + val currentApps = Map[String, ApplicationHistoryInfo]( + appList.map(app => (app.id -> app)):_*) + + // For any application that either (i) is not listed or (ii) has changed since the last time + // the listing was created (defined by the log dir's modification time), load the app's info. + // Otherwise just reuse what's already in memory. + val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size) + for (dir <- logInfos) { + val curr = currentApps.getOrElse(dir.getPath().getName(), null) + if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + try { + newApps += loadAppInfo(dir, false)._1 + } catch { + case e: Exception => logError(s"Failed to load app info from directory $dir.") + } + } else { + newApps += curr + } + } + + appList = newApps.sortBy { info => -info.endTime } + } catch { + case t: Throwable => logError("Exception in checking for event log updates", t) + } + } + + /** + * Parse the application's logs to find out the information we need to build the + * listing page. + * + * When creating the listing of available apps, there is no need to load the whole UI for the + * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user + * clicks on a specific application. + * + * @param logDir Directory with application's log files. + * @param renderUI Whether to create the SparkUI for the application. + * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false. + */ + private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = { + val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) + val path = logDir.getPath + val appId = path.getName + val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) + val appListener = new ApplicationEventListener + replayBus.addListener(appListener) + + val ui: SparkUI = if (renderUI) { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + // Do not call ui.bind() to avoid creating a new server for each application + } else { + null + } + + replayBus.replay() + val appInfo = ApplicationHistoryInfo( + appId, + appListener.appName, + appListener.startTime, + appListener.endTime, + getModificationTime(logDir), + appListener.sparkUser) + + if (ui != null) { + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setUIAcls(uiAclsEnabled) + ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) + } + (appInfo, ui) + } + + /** Return when this directory was last modified. */ + private def getModificationTime(dir: FileStatus): Long = { + try { + val logFiles = fs.listStatus(dir.getPath) + if (logFiles != null && !logFiles.isEmpty) { + logFiles.map(_.getModificationTime).max + } else { + dir.getModificationTime + } + } catch { + case t: Throwable => + logError("Exception in accessing modification time of %s".format(dir.getPath), t) + -1L + } + } + + /** Returns the system's mononotically increasing time. */ + private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000) + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 180c853ce3..a958c837c2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -25,20 +25,36 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { + private val pageSize = 20 + def render(request: HttpServletRequest): Seq[Node] = { - val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } - val appTable = UIUtils.listingTable(appHeader, appRow, appRows) + val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt + val requestedFirst = (requestedPage - 1) * pageSize + + val allApps = parent.getApplicationList() + val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 + val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) + + val actualPage = (actualFirst / pageSize) + 1 + val last = Math.min(actualFirst + pageSize, allApps.size) - 1 + val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) + + val appTable = UIUtils.listingTable(appHeader, appRow, apps) + val providerConfig = parent.getProviderConfig() val content = <div class="row-fluid"> <div class="span12"> <ul class="unstyled"> - <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li> + { providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) } </ul> { - if (parent.appIdToInfo.size > 0) { + if (allApps.size > 0) { <h4> - Showing {parent.appIdToInfo.size}/{parent.getNumApplications} - Completed Application{if (parent.getNumApplications > 1) "s" else ""} + Showing {actualFirst + 1}-{last + 1} of {allApps.size} + <span style="float: right"> + {if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}><</a>} + {if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>></a>} + </span> </h4> ++ appTable } else { @@ -56,26 +72,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Completed", "Duration", "Spark User", - "Log Directory", "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val appName = if (info.started) info.name else info.logDirPath.getName - val uiAddress = parent.getAddress + info.ui.basePath - val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started" - val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed" - val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L - val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---" - val sparkUser = if (info.started) info.sparkUser else "Unknown user" - val logDirectory = info.logDirPath.getName + val uiAddress = "/history/" + info.id + val startTime = UIUtils.formatDate(info.startTime) + val endTime = UIUtils.formatDate(info.endTime) + val duration = UIUtils.formatDuration(info.endTime - info.startTime) val lastUpdated = UIUtils.formatDate(info.lastUpdated) <tr> - <td><a href={uiAddress}>{appName}</a></td> + <td><a href={uiAddress}>{info.name}</a></td> <td>{startTime}</td> <td>{endTime}</td> <td>{duration}</td> - <td>{sparkUser}</td> - <td>{logDirectory}</td> + <td>{info.sparkUser}</td> <td>{lastUpdated}</td> </tr> } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index a9c11dca56..29a78a56c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,14 +17,15 @@ package org.apache.spark.deploy.history -import scala.collection.mutable +import java.util.NoSuchElementException +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import org.apache.hadoop.fs.{FileStatus, Path} +import com.google.common.cache._ +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.scheduler._ -import org.apache.spark.ui.{WebUI, SparkUI} +import org.apache.spark.ui.{WebUI, SparkUI, UIUtils} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -38,56 +39,68 @@ import org.apache.spark.util.Utils * application's event logs are maintained in the application's own sub-directory. This * is the same structure as maintained in the event log write code path in * EventLoggingListener. - * - * @param baseLogDir The base directory in which event logs are found */ class HistoryServer( - val baseLogDir: String, + conf: SparkConf, + provider: ApplicationHistoryProvider, securityManager: SecurityManager, - conf: SparkConf) - extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging { - - import HistoryServer._ + port: Int) + extends WebUI(securityManager, port, conf) with Logging { - private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) - private val localHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) + // How many applications to retain + private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTime = -1L + private val appLoader = new CacheLoader[String, SparkUI] { + override def load(key: String): SparkUI = { + val ui = provider.getAppUI(key) + if (ui == null) { + throw new NoSuchElementException() + } + attachSparkUI(ui) + ui + } + } - // Number of completed applications found in this directory - private var numCompletedApplications = 0 + private val appCache = CacheBuilder.newBuilder() + .maximumSize(retainedApplications) + .removalListener(new RemovalListener[String, SparkUI] { + override def onRemoval(rm: RemovalNotification[String, SparkUI]) = { + detachSparkUI(rm.getValue()) + } + }) + .build(appLoader) + + private val loaderServlet = new HttpServlet { + protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + val parts = Option(req.getPathInfo()).getOrElse("").split("/") + if (parts.length < 2) { + res.sendError(HttpServletResponse.SC_BAD_REQUEST, + s"Unexpected path info in request (URI = ${req.getRequestURI()}") + return + } - @volatile private var stopped = false + val appId = parts(1) - /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. - */ - private val logCheckingThread = new Thread { - override def run(): Unit = Utils.logUncaughtExceptions { - while (!stopped) { - val now = System.currentTimeMillis - if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { - checkForLogs() - Thread.sleep(UPDATE_INTERVAL_MS) - } else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + // Note we don't use the UI retrieved from the cache; the cache loader above will register + // the app's UI, and all we need to do is redirect the user to the same URI that was + // requested, and the proper data should be served at that point. + try { + appCache.get(appId) + res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) + } catch { + case e: Exception => e.getCause() match { + case nsee: NoSuchElementException => + val msg = <div class="row-fluid">Application {appId} not found.</div> + res.setStatus(HttpServletResponse.SC_NOT_FOUND) + UIUtils.basicSparkPage(msg, "Not Found").foreach( + n => res.getWriter().write(n.toString)) + + case cause: Exception => throw cause } } } } - // A mapping of application ID to its history information, which includes the rendered UI - val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() - initialize() /** @@ -98,108 +111,23 @@ class HistoryServer( */ def initialize() { attachPage(new HistoryPage(this)) - attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) + + val contextHandler = new ServletContextHandler + contextHandler.setContextPath("/history") + contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") + attachHandler(contextHandler) } /** Bind to the HTTP server behind this web interface. */ override def bind() { super.bind() - logCheckingThread.start() - } - - /** - * Check for any updates to event logs in the base directory. This is only effective once - * the server has been bound. - * - * If a new completed application is found, the server renders the associated SparkUI - * from the application's event logs, attaches this UI to itself, and stores metadata - * information for this application. - * - * If the logs for an existing completed application are no longer found, the server - * removes all associated information and detaches the SparkUI. - */ - def checkForLogs() = synchronized { - if (serverInfo.isDefined) { - lastLogCheckTime = System.currentTimeMillis - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) - try { - val logStatus = fileSystem.listStatus(new Path(baseLogDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs - .sortBy { dir => getModificationTime(dir) } - .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) } - .filter { case (dir, info) => info.applicationComplete } - - // Logging information for applications that should be retained - val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS) - val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName } - - // Remove any applications that should no longer be retained - appIdToInfo.foreach { case (appId, info) => - if (!retainedAppIds.contains(appId)) { - detachSparkUI(info.ui) - appIdToInfo.remove(appId) - } - } - - // Render the application's UI if it is not already there - retainedLogInfos.foreach { case (dir, info) => - val appId = dir.getPath.getName - if (!appIdToInfo.contains(appId)) { - renderSparkUI(dir, info) - } - } - - // Track the total number of completed applications observed this round - numCompletedApplications = logInfos.size - - } catch { - case e: Exception => logError("Exception in checking for event log updates", e) - } - } else { - logWarning("Attempted to check for event log updates before binding the server.") - } - } - - /** - * Render a new SparkUI from the event logs if the associated application is completed. - * - * HistoryServer looks for a special file that indicates application completion in the given - * directory. If this file exists, the associated application is regarded to be completed, in - * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing. - */ - private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) { - val path = logDir.getPath - val appId = path.getName - val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec) - val appListener = new ApplicationEventListener - replayBus.addListener(appListener) - val appConf = conf.clone() - val appSecManager = new SecurityManager(appConf) - val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) - - // Do not call ui.bind() to avoid creating a new server for each application - replayBus.replay() - if (appListener.applicationStarted) { - appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED) - appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) - attachSparkUI(ui) - val appName = appListener.appName - val sparkUser = appListener.sparkUser - val startTime = appListener.startTime - val endTime = appListener.endTime - val lastUpdated = getModificationTime(logDir) - ui.setAppName(appName + " (completed)") - appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime, - lastUpdated, sparkUser, path, ui) - } } /** Stop the server and close the file system. */ override def stop() { super.stop() - stopped = true - fileSystem.close() + provider.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -215,27 +143,20 @@ class HistoryServer( ui.getHandlers.foreach(detachHandler) } - /** Return the address of this server. */ - def getAddress: String = "http://" + publicHost + ":" + boundPort + /** + * Returns a list of available applications, in descending order according to their end time. + * + * @return List of all known applications. + */ + def getApplicationList() = provider.getListing() - /** Return the number of completed applications found, whether or not the UI is rendered. */ - def getNumApplications: Int = numCompletedApplications + /** + * Returns the provider configuration to show in the listing page. + * + * @return A map with the provider's configuration. + */ + def getProviderConfig() = provider.getConfig() - /** Return when this directory was last modified. */ - private def getModificationTime(dir: FileStatus): Long = { - try { - val logFiles = fileSystem.listStatus(dir.getPath) - if (logFiles != null && !logFiles.isEmpty) { - logFiles.map(_.getModificationTime).max - } else { - dir.getModificationTime - } - } catch { - case e: Exception => - logError("Exception in accessing modification time of %s".format(dir.getPath), e) - -1L - } - } } /** @@ -251,30 +172,31 @@ class HistoryServer( object HistoryServer { private val conf = new SparkConf - // Interval between each check for event log updates - val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000 - - // How many applications to retain - val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250) - - // The port to which the web UI is bound - val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080) - - // set whether to enable or disable view acls for all applications - val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false) - - val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR - def main(argStrings: Array[String]) { initSecurity() - val args = new HistoryServerArguments(argStrings) + val args = new HistoryServerArguments(conf, argStrings) val securityManager = new SecurityManager(conf) - val server = new HistoryServer(args.logDir, securityManager, conf) + + val providerName = conf.getOption("spark.history.provider") + .getOrElse(classOf[FsHistoryProvider].getName()) + val provider = Class.forName(providerName) + .getConstructor(classOf[SparkConf]) + .newInstance(conf) + .asInstanceOf[ApplicationHistoryProvider] + + val port = conf.getInt("spark.history.ui.port", 18080) + + val server = new HistoryServer(conf, provider, securityManager, port) server.bind() + Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") { + override def run() = { + server.stop() + } + }) + // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } - server.stop() } def initSecurity() { @@ -291,17 +213,3 @@ object HistoryServer { } } - - -private[spark] case class ApplicationHistoryInfo( - id: String, - name: String, - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String, - logDirPath: Path, - ui: SparkUI) { - def started = startTime != -1 - def completed = endTime != -1 -} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 943c061743..be9361b754 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -17,17 +17,14 @@ package org.apache.spark.deploy.history -import java.net.URI - -import org.apache.hadoop.fs.Path - +import org.apache.spark.SparkConf import org.apache.spark.util.Utils /** * Command-line parser for the master. */ -private[spark] class HistoryServerArguments(args: Array[String]) { - var logDir = "" +private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) { + private var logDir: String = null parse(args.toList) @@ -45,32 +42,36 @@ private[spark] class HistoryServerArguments(args: Array[String]) { case _ => printUsageAndExit(1) } - validateLogDir() - } - - private def validateLogDir() { - if (logDir == "") { - System.err.println("Logging directory must be specified.") - printUsageAndExit(1) - } - val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - val path = new Path(logDir) - if (!fileSystem.exists(path)) { - System.err.println("Logging directory specified does not exist: %s".format(logDir)) - printUsageAndExit(1) - } - if (!fileSystem.getFileStatus(path).isDir) { - System.err.println("Logging directory specified is not a directory: %s".format(logDir)) - printUsageAndExit(1) + if (logDir != null) { + conf.set("spark.history.fs.logDirectory", logDir) } } private def printUsageAndExit(exitCode: Int) { System.err.println( - "Usage: HistoryServer [options]\n" + - "\n" + - "Options:\n" + - " -d DIR, --dir DIR Location of event log files") + """ + |Usage: HistoryServer + | + |Configuration options can be set by setting the corresponding JVM system property. + |History Server options are always available; additional options depend on the provider. + | + |History Server options: + | + | spark.history.ui.port Port where server will listen for connections + | (default 18080) + | spark.history.acls.enable Whether to enable view acls for all applications + | (default false) + | spark.history.provider Name of history provider class (defaults to + | file system-based provider) + | spark.history.retainedApplications Max number of application UIs to keep loaded in memory + | (default 50) + |FsHistoryProvider options: + | + | spark.history.fs.logDirectory Directory where app logs are stored (required) + | spark.history.fs.updateInterval How often to reload log data from storage (in seconds, + | default 10) + |""".stripMargin) System.exit(exitCode) } + } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index b08f308fda..856273e1d4 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -51,6 +51,7 @@ private[spark] abstract class WebUI( def getTabs: Seq[WebUITab] = tabs.toSeq def getHandlers: Seq[ServletContextHandler] = handlers.toSeq + def getSecurityManager: SecurityManager = securityManager /** Attach a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab) { |