diff options
author | Steve Loughran <stevel@hortonworks.com> | 2016-02-11 21:37:53 -0600 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2016-02-11 21:37:53 -0600 |
commit | a2c7dcf61f33fa1897c950d2d905651103c170ea (patch) | |
tree | 90268ba2e3c02be159411ed15d31408cd99e505a /core/src/main/scala/org | |
parent | d3e2e202994e063856c192e9fdd0541777b88e0e (diff) | |
download | spark-a2c7dcf61f33fa1897c950d2d905651103c170ea.tar.gz spark-a2c7dcf61f33fa1897c950d2d905651103c170ea.tar.bz2 spark-a2c7dcf61f33fa1897c950d2d905651103c170ea.zip |
[SPARK-7889][WEBUI] HistoryServer updates UI for incomplete apps
When the HistoryServer is showing an incomplete app, it needs to check if there is a newer version of the app available. It does this by checking if a version of the app has been loaded with a larger *filesize*. If so, it detaches the current UI, attaches the new one, and redirects back to the same URL to show the new UI.
https://issues.apache.org/jira/browse/SPARK-7889
Author: Steve Loughran <stevel@hortonworks.com>
Author: Imran Rashid <irashid@cloudera.com>
Closes #11118 from squito/SPARK-7889-alternate.
Diffstat (limited to 'core/src/main/scala/org')
6 files changed, 890 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala new file mode 100644 index 0000000000..e2fda29044 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.NoSuchElementException +import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.codahale.metrics.{Counter, MetricRegistry, Timer} +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} +import org.eclipse.jetty.servlet.FilterHolder + +import org.apache.spark.Logging +import org.apache.spark.metrics.source.Source +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Clock + +/** + * Cache for applications. + * + * Completed applications are cached for as long as there is capacity for them. + * Incompleted applications have their update time checked on every + * retrieval; if the cached entry is out of date, it is refreshed. + * + * @note there must be only one instance of [[ApplicationCache]] in a + * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]] + * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs + * can probe the current cache to see if the attempts have changed. + * + * Creating multiple instances will break this routing. + * @param operations implementation of record access operations + * @param retainedApplications number of retained applications + * @param clock time source + */ +private[history] class ApplicationCache( + val operations: ApplicationCacheOperations, + val retainedApplications: Int, + val clock: Clock) extends Logging { + + /** + * Services the load request from the cache. + */ + private val appLoader = new CacheLoader[CacheKey, CacheEntry] { + + /** the cache key doesn't match a cached entry, or the entry is out-of-date, so load it. */ + override def load(key: CacheKey): CacheEntry = { + loadApplicationEntry(key.appId, key.attemptId) + } + + } + + /** + * Handler for callbacks from the cache of entry removal. + */ + private val removalListener = new RemovalListener[CacheKey, CacheEntry] { + + /** + * Removal event notifies the provider to detach the UI. + * @param rm removal notification + */ + override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = { + metrics.evictionCount.inc() + val key = rm.getKey + logDebug(s"Evicting entry ${key}") + operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui) + } + } + + /** + * The cache of applications. + * + * Tagged as `protected` so as to allow subclasses in tests to accesss it directly + */ + protected val appCache: LoadingCache[CacheKey, CacheEntry] = { + CacheBuilder.newBuilder() + .maximumSize(retainedApplications) + .removalListener(removalListener) + .build(appLoader) + } + + /** + * The metrics which are updated as the cache is used. + */ + val metrics = new CacheMetrics("history.cache") + + init() + + /** + * Perform any startup operations. + * + * This includes declaring this instance as the cache to use in the + * [[ApplicationCacheCheckFilterRelay]]. + */ + private def init(): Unit = { + ApplicationCacheCheckFilterRelay.setApplicationCache(this) + } + + /** + * Stop the cache. + * This will reset the relay in [[ApplicationCacheCheckFilterRelay]]. + */ + def stop(): Unit = { + ApplicationCacheCheckFilterRelay.resetApplicationCache() + } + + /** + * Get an entry. + * + * Cache fetch/refresh will have taken place by the time this method returns. + * @param appAndAttempt application to look up in the format needed by the history server web UI, + * `appId/attemptId` or `appId`. + * @return the entry + */ + def get(appAndAttempt: String): SparkUI = { + val parts = splitAppAndAttemptKey(appAndAttempt) + get(parts._1, parts._2) + } + + /** + * Get the Spark UI, converting a lookup failure from an exception to `None`. + * @param appAndAttempt application to look up in the format needed by the history server web UI, + * `appId/attemptId` or `appId`. + * @return the entry + */ + def getSparkUI(appAndAttempt: String): Option[SparkUI] = { + try { + val ui = get(appAndAttempt) + Some(ui) + } catch { + case NonFatal(e) => e.getCause() match { + case nsee: NoSuchElementException => + None + case cause: Exception => throw cause + } + } + } + + /** + * Get the associated spark UI. + * + * Cache fetch/refresh will have taken place by the time this method returns. + * @param appId application ID + * @param attemptId optional attempt ID + * @return the entry + */ + def get(appId: String, attemptId: Option[String]): SparkUI = { + lookupAndUpdate(appId, attemptId)._1.ui + } + + /** + * Look up the entry; update it if needed. + * @param appId application ID + * @param attemptId optional attempt ID + * @return the underlying cache entry -which can have its timestamp changed, and a flag to + * indicate that the entry has changed + */ + private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = { + metrics.lookupCount.inc() + val cacheKey = CacheKey(appId, attemptId) + var entry = appCache.getIfPresent(cacheKey) + var updated = false + if (entry == null) { + // no entry, so fetch without any post-fetch probes for out-of-dateness + // this will trigger a callback to loadApplicationEntry() + entry = appCache.get(cacheKey) + } else if (!entry.completed) { + val now = clock.getTimeMillis() + log.debug(s"Probing at time $now for updated application $cacheKey -> $entry") + metrics.updateProbeCount.inc() + updated = time(metrics.updateProbeTimer) { + entry.updateProbe() + } + if (updated) { + logDebug(s"refreshing $cacheKey") + metrics.updateTriggeredCount.inc() + appCache.refresh(cacheKey) + // and repeat the lookup + entry = appCache.get(cacheKey) + } else { + // update the probe timestamp to the current time + entry.probeTime = now + } + } + (entry, updated) + } + + /** + * This method is visible for testing. + * + * It looks up the cached entry *and returns a clone of it*. + * This ensures that the cached entries never leak + * @param appId application ID + * @param attemptId optional attempt ID + * @return a new entry with shared SparkUI, but copies of the other fields. + */ + def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = { + val entry = lookupAndUpdate(appId, attemptId)._1 + new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime) + } + + /** + * Probe for an application being updated. + * @param appId application ID + * @param attemptId attempt ID + * @return true if an update has been triggered + */ + def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = { + val (entry, updated) = lookupAndUpdate(appId, attemptId) + updated + } + + /** + * Size probe, primarily for testing. + * @return size + */ + def size(): Long = appCache.size() + + /** + * Emptiness predicate, primarily for testing. + * @return true if the cache is empty + */ + def isEmpty: Boolean = appCache.size() == 0 + + /** + * Time a closure, returning its output. + * @param t timer + * @param f function + * @tparam T type of return value of time + * @return the result of the function. + */ + private def time[T](t: Timer)(f: => T): T = { + val timeCtx = t.time() + try { + f + } finally { + timeCtx.close() + } + } + + /** + * Load the Spark UI via [[ApplicationCacheOperations.getAppUI()]], + * then attach it to the web UI via [[ApplicationCacheOperations.attachSparkUI()]]. + * + * If the application is incomplete, it has the [[ApplicationCacheCheckFilter]] + * added as a filter to the HTTP requests, so that queries on the UI will trigger + * update checks. + * + * The generated entry contains the UI and the current timestamp. + * The timer [[metrics.loadTimer]] tracks the time taken to load the UI. + * + * @param appId application ID + * @param attemptId optional attempt ID + * @return the cache entry + * @throws NoSuchElementException if there is no matching element + */ + @throws[NoSuchElementException] + def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { + + logDebug(s"Loading application Entry $appId/$attemptId") + metrics.loadCount.inc() + time(metrics.loadTimer) { + operations.getAppUI(appId, attemptId) match { + case Some(LoadedAppUI(ui, updateState)) => + val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed) + if (completed) { + // completed spark UIs are attached directly + operations.attachSparkUI(appId, attemptId, ui, completed) + } else { + // incomplete UIs have the cache-check filter put in front of them. + ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId) + operations.attachSparkUI(appId, attemptId, ui, completed) + } + // build the cache entry + val now = clock.getTimeMillis() + val entry = new CacheEntry(ui, completed, updateState, now) + logDebug(s"Loaded application $appId/$attemptId -> $entry") + entry + case None => + metrics.lookupFailureCount.inc() + // guava's cache logs via java.util log, so is of limited use. Hence: our own message + logInfo(s"Failed to load application attempt $appId/$attemptId") + throw new NoSuchElementException(s"no application with application Id '$appId'" + + attemptId.map { id => s" attemptId '$id'" }.getOrElse(" and no attempt Id")) + } + } + } + + /** + * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces. + * + * @param appAndAttempt combined key + * @return a tuple of the application ID and, if present, the attemptID + */ + def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = { + val parts = appAndAttempt.split("/") + require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt") + val appId = parts(0) + val attemptId = if (parts.length > 1) Some(parts(1)) else None + (appId, attemptId) + } + + /** + * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`. + * + * If there is an `attemptId`; `applicationId` if not. + * @param appId application ID + * @param attemptId optional attempt ID + * @return a unified string + */ + def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = { + appId + attemptId.map { id => s"/$id" }.getOrElse("") + } + + /** + * String operator dumps the cache entries and metrics. + * @return a string value, primarily for testing and diagnostics + */ + override def toString: String = { + val sb = new StringBuilder(s"ApplicationCache(" + + s" retainedApplications= $retainedApplications)") + sb.append(s"; time= ${clock.getTimeMillis()}") + sb.append(s"; entry count= ${appCache.size()}\n") + sb.append("----\n") + appCache.asMap().asScala.foreach { + case(key, entry) => sb.append(s" $key -> $entry\n") + } + sb.append("----\n") + sb.append(metrics) + sb.append("----\n") + sb.toString() + } +} + +/** + * An entry in the cache. + * + * @param ui Spark UI + * @param completed Flag to indicated that the application has completed (and so + * does not need refreshing). + * @param updateProbe function to call to see if the application has been updated and + * therefore that the cached value needs to be refreshed. + * @param probeTime Times in milliseconds when the probe was last executed. + */ +private[history] final class CacheEntry( + val ui: SparkUI, + val completed: Boolean, + val updateProbe: () => Boolean, + var probeTime: Long) { + + /** string value is for test assertions */ + override def toString: String = { + s"UI $ui, completed=$completed, probeTime=$probeTime" + } +} + +/** + * Cache key: compares on `appId` and then, if non-empty, `attemptId`. + * The [[hashCode()]] function uses the same fields. + * @param appId application ID + * @param attemptId attempt ID + */ +private[history] final case class CacheKey(appId: String, attemptId: Option[String]) { + + override def toString: String = { + appId + attemptId.map { id => s"/$id" }.getOrElse("") + } +} + +/** + * Metrics of the cache + * @param prefix prefix to register all entries under + */ +private[history] class CacheMetrics(prefix: String) extends Source { + + /* metrics: counters and timers */ + val lookupCount = new Counter() + val lookupFailureCount = new Counter() + val evictionCount = new Counter() + val loadCount = new Counter() + val loadTimer = new Timer() + val updateProbeCount = new Counter() + val updateProbeTimer = new Timer() + val updateTriggeredCount = new Counter() + + /** all the counters: for registration and string conversion. */ + private val counters = Seq( + ("lookup.count", lookupCount), + ("lookup.failure.count", lookupFailureCount), + ("eviction.count", evictionCount), + ("load.count", loadCount), + ("update.probe.count", updateProbeCount), + ("update.triggered.count", updateTriggeredCount)) + + /** all metrics, including timers */ + private val allMetrics = counters ++ Seq( + ("load.timer", loadTimer), + ("update.probe.timer", updateProbeTimer)) + + /** + * Name of metric source + */ + override val sourceName = "ApplicationCache" + + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * Startup actions. + * This includes registering metrics with [[metricRegistry]] + */ + private def init(): Unit = { + allMetrics.foreach { case (name, metric) => + metricRegistry.register(MetricRegistry.name(prefix, name), metric) + } + } + + override def toString: String = { + val sb = new StringBuilder() + counters.foreach { case (name, counter) => + sb.append(name).append(" = ").append(counter.getCount).append('\n') + } + sb.toString() + } +} + +/** + * API for cache events. That is: loading an App UI; and for + * attaching/detaching the UI to and from the Web UI. + */ +private[history] trait ApplicationCacheOperations { + + /** + * Get the application UI and the probe neededed to see if it has been updated. + * @param appId application ID + * @param attemptId attempt ID + * @return If found, the Spark UI and any history information to be used in the cache + */ + def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] + + /** + * Attach a reconstructed UI. + * @param appId application ID + * @param attemptId attempt ID + * @param ui UI + * @param completed flag to indicate that the UI has completed + */ + def attachSparkUI( + appId: String, + attemptId: Option[String], + ui: SparkUI, + completed: Boolean): Unit + + /** + * Detach a Spark UI. + * + * @param ui Spark UI + */ + def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit + +} + +/** + * This is a servlet filter which intercepts HTTP requests on application UIs and + * triggers checks for updated data. + * + * If the application cache indicates that the application has been updated, + * the filter returns a 302 redirect to the caller, asking them to re-request the web + * page. + * + * Because the application cache will detach and then re-attach the UI, when the caller + * repeats that request, it will now pick up the newly-updated web application. + * + * This does require the caller to handle 302 requests. Because of the ambiguity + * in how POST and PUT operations are responded to (that is, should a 307 be + * processed directly), the filter <i>does not</i> filter those requests. + * As the current web UIs are read-only, this is not an issue. If it were ever to + * support more HTTP verbs, then some support may be required. Perhaps, rather + * than sending a redirect, simply updating the value so that the <i>next</i> + * request will pick it up. + * + * Implementation note: there's some abuse of a shared global entry here because + * the configuration data passed to the servlet is just a string:string map. + */ +private[history] class ApplicationCacheCheckFilter() extends Filter with Logging { + + import ApplicationCacheCheckFilterRelay._ + var appId: String = _ + var attemptId: Option[String] = _ + + /** + * Bind the app and attempt ID, throwing an exception if no application ID was provided. + * @param filterConfig configuration + */ + override def init(filterConfig: FilterConfig): Unit = { + + appId = Option(filterConfig.getInitParameter(APP_ID)) + .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID")) + attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID)) + logDebug(s"initializing filter $this") + } + + /** + * Filter the request. + * Either the caller is given a 302 redirect to the current URL, or the + * request is passed on to the SparkUI servlets. + * + * @param request HttpServletRequest + * @param response HttpServletResponse + * @param chain the rest of the request chain + */ + override def doFilter( + request: ServletRequest, + response: ServletResponse, + chain: FilterChain): Unit = { + + // nobody has ever implemented any other kind of servlet, yet + // this check is universal, just in case someone does exactly + // that on your classpath + if (!(request.isInstanceOf[HttpServletRequest])) { + throw new ServletException("This filter only works for HTTP/HTTPS") + } + val httpRequest = request.asInstanceOf[HttpServletRequest] + val httpResponse = response.asInstanceOf[HttpServletResponse] + val requestURI = httpRequest.getRequestURI + val operation = httpRequest.getMethod + + // if the request is for an attempt, check to see if it is in need of delete/refresh + // and have the cache update the UI if so + if (operation=="HEAD" || operation=="GET" + && checkForUpdates(requestURI, appId, attemptId)) { + // send a redirect back to the same location. This will be routed + // to the *new* UI + logInfo(s"Application Attempt $appId/$attemptId updated; refreshing") + val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") + val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) + httpResponse.sendRedirect(redirectUrl) + } else { + chain.doFilter(request, response) + } + } + + override def destroy(): Unit = { + } + + override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId" +} + +/** + * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache + * probes to the cache. + * + * This is an ugly workaround for the limitation of servlets and filters in the Java servlet + * API; they are still configured on the model of a list of classnames and configuration + * strings in a `web.xml` field, rather than a chain of instances wired up by hand or + * via an injection framework. There is no way to directly configure a servlet filter instance + * with a reference to the application cache which is must use: some global state is needed. + * + * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests + * to the singleton [[ApplicationCache]] + * + * The field `applicationCache` must be set for the filters to work - + * this is done during the construction of [[ApplicationCache]], which requires that there + * is only one cache serving requests through the WebUI. + * + * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic + * will break: filters may not find instances. Tests must not do that. + * + */ +private[history] object ApplicationCacheCheckFilterRelay extends Logging { + // name of the app ID entry in the filter configuration. Mandatory. + val APP_ID = "appId" + + // name of the attempt ID entry in the filter configuration. Optional. + val ATTEMPT_ID = "attemptId" + + // namer of the filter to register + val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter" + + /** the application cache to relay requests to */ + @volatile + private var applicationCache: Option[ApplicationCache] = None + + /** + * Set the application cache. Logs a warning if it is overwriting an existing value + * @param cache new cache + */ + def setApplicationCache(cache: ApplicationCache): Unit = { + applicationCache.foreach( c => logWarning(s"Overwriting application cache $c")) + applicationCache = Some(cache) + } + + /** + * Reset the application cache + */ + def resetApplicationCache(): Unit = { + applicationCache = None + } + + /** + * Check to see if there has been an update + * @param requestURI URI the request came in on + * @param appId application ID + * @param attemptId attempt ID + * @return true if an update was loaded for the app/attempt + */ + def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = { + + logDebug(s"Checking $appId/$attemptId from $requestURI") + applicationCache match { + case Some(cache) => + try { + cache.checkForUpdates(appId, attemptId) + } catch { + case ex: Exception => + // something went wrong. Keep going with the existing UI + logWarning(s"When checking for $appId/$attemptId from $requestURI", ex) + false + } + + case None => + logWarning("No application cache instance defined") + false + } + } + + + /** + * Register a filter for the web UI which checks for updates to the given app/attempt + * @param ui Spark UI to attach filters to + * @param appId application ID + * @param attemptId attempt ID + */ + def registerFilter( + ui: SparkUI, + appId: String, + attemptId: Option[String] ): Unit = { + require(ui != null) + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) + val holder = new FilterHolder() + holder.setClassName(FILTER_NAME) + holder.setInitParameter(APP_ID, appId) + attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id)) + require(ui.getHandlers != null, "null handlers") + ui.getHandlers.foreach { handler => + handler.addFilter(holder, "/*", enumDispatcher) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 5f5e0fe1c3..44661edfff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -33,7 +33,42 @@ private[spark] case class ApplicationAttemptInfo( private[spark] case class ApplicationHistoryInfo( id: String, name: String, - attempts: List[ApplicationAttemptInfo]) + attempts: List[ApplicationAttemptInfo]) { + + /** + * Has this application completed? + * @return true if the most recent attempt has completed + */ + def completed: Boolean = { + attempts.nonEmpty && attempts.head.completed + } +} + +/** + * A probe which can be invoked to see if a loaded Web UI has been updated. + * The probe is expected to be relative purely to that of the UI returned + * in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded, + * the probe returned with it is the one that must be used to check for it + * being out of date; previous probes must be discarded. + */ +private[history] abstract class HistoryUpdateProbe { + /** + * Return true if the history provider has a later version of the application + * attempt than the one against this probe was constructed. + * @return + */ + def isUpdated(): Boolean +} + +/** + * All the information returned from a call to `getAppUI()`: the new UI + * and any required update state. + * @param ui Spark UI + * @param updateProbe probe to call to check on the update state of this application attempt + */ +private[history] case class LoadedAppUI( + ui: SparkUI, + updateProbe: () => Boolean) private[history] abstract class ApplicationHistoryProvider { @@ -49,9 +84,10 @@ private[history] abstract class ApplicationHistoryProvider { * * @param appId The application ID. * @param attemptId The application attempt ID (or None if there is no attempt ID). - * @return The application's UI, or None if application is not found. + * @return a [[LoadedAppUI]] instance containing the application's UI and any state information + * for update probes, or `None` if the application/attempt is not found. */ - def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] + def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 9648959dba..f885798760 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream} +import java.io.{FileNotFoundException, IOException, OutputStream} import java.util.UUID import java.util.concurrent.{Executors, ExecutorService, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -33,7 +33,6 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -42,6 +41,31 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * A class that provides application history from event logs stored in the file system. * This provider checks for new finished applications in the background periodically and * renders the history application UI by parsing the associated event logs. + * + * == How new and updated attempts are detected == + * + * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any + * entries in the log dir whose modification time is greater than the last scan time + * are considered new or updated. These are replayed to create a new [[FsApplicationAttemptInfo]] + * entry and update or create a matching [[FsApplicationHistoryInfo]] element in the list + * of applications. + * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the + * [[FsApplicationAttemptInfo]] is replaced by another one with a larger log size. + * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]] + * instance is out of date, the log size of the cached instance is checked against the app last + * loaded by [[checkForLogs]]. + * + * The use of log size, rather than simply relying on modification times, is needed to + * address the following issues + * - some filesystems do not appear to update the `modtime` value whenever data is flushed to + * an open file output stream. Changes to the history may not be picked up. + * - the granularity of the `modtime` field may be 2+ seconds. Rapid changes to the FS can be + * missed. + * + * Tracking filesize works given the following invariant: the logs get bigger + * as new events are added. If a format was used in which this did not hold, the mechanism would + * break. Simple streaming of JSON-formatted events, as is implemented today, implicitly + * maintains this invariant. */ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) extends ApplicationHistoryProvider with Logging { @@ -77,9 +101,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("spark-history-task-%d").setDaemon(true).build()) - // The modification time of the newest log detected during the last scan. This is used - // to ignore logs that are older during subsequent scans, to avoid processing data that - // is already known. + // The modification time of the newest log detected during the last scan. Currently only + // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private var lastScanTime = -1L // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted @@ -87,6 +110,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() + val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]() + // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] @@ -176,18 +201,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Disable the background thread during tests. if (!conf.contains("spark.testing")) { // A task that periodically checks for event log updates on disk. + logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds") pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } + } else { + logDebug("Background update thread disabled for testing") } } override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values - override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = { + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { applications.get(appId).flatMap { appInfo => appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => @@ -210,7 +238,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) - ui + LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) } } } @@ -243,12 +271,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] def checkForLogs(): Unit = { try { val newLastScanTime = getNewLastScanTime() + logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) .getOrElse(Seq[FileStatus]()) + // scan for modified applications, replay and merge them val logInfos: Seq[FileStatus] = statusList .filter { entry => try { - !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime) + val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) + !entry.isDirectory() && prevFileSize < entry.getLen() } catch { case e: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on @@ -262,6 +293,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) entry1.getModificationTime() >= entry2.getModificationTime() } + if (logInfos.nonEmpty) { + logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") + } logInfos.grouped(20) .map { batch => replayExecutor.submit(new Runnable { @@ -356,7 +390,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val bus = new ReplayListenerBus() val res = replay(fileStatus, bus) res match { - case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.") + case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r") case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " + "The application may have not started.") } @@ -511,6 +545,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") + // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, + // and when we read the file here. That is OK -- it may result in an unnecessary refresh + // when there is no update, but will not result in missing an update. We *must* prevent + // an error the other way -- if we report a size bigger (ie later) than the file that is + // actually read, we may never refresh the app. FileStatus is guaranteed to be static + // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { val appListener = new ApplicationEventListener @@ -521,7 +561,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Without an app ID, new logs will render incorrectly in the listing page, so do not list or // try to show their UI. if (appListener.appId.isDefined) { - Some(new FsApplicationAttemptInfo( + val attemptInfo = new FsApplicationAttemptInfo( logPath.getName(), appListener.appName.getOrElse(NOT_STARTED), appListener.appId.getOrElse(logPath.getName()), @@ -530,7 +570,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appListener.endTime.getOrElse(-1L), eventLog.getModificationTime(), appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted)) + appCompleted, + eventLog.getLen() + ) + fileToAppInfo(logPath) = attemptInfo + Some(attemptInfo) } else { None } @@ -564,12 +608,77 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET) } + /** + * String description for diagnostics + * @return a summary of the component state + */ + override def toString: String = { + val header = s""" + | FsHistoryProvider: logdir=$logDir, + | last scan time=$lastScanTime + | Cached application count =${applications.size}} + """.stripMargin + val sb = new StringBuilder(header) + applications.foreach(entry => sb.append(entry._2).append("\n")) + sb.toString + } + + /** + * Look up an application attempt + * @param appId application ID + * @param attemptId Attempt ID, if set + * @return the matching attempt, if found + */ + def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = { + applications.get(appId).flatMap { appInfo => + appInfo.attempts.find(_.attemptId == attemptId) + } + } + + /** + * Return true iff a newer version of the UI is available. The check is based on whether the + * fileSize for the currently loaded UI is smaller than the file size the last time + * the logs were loaded. + * + * This is a very cheap operation -- the work of loading the new attempt was already done + * by [[checkForLogs]]. + * @param appId application to probe + * @param attemptId attempt to probe + * @param prevFileSize the file size of the logs for the currently displayed UI + */ + private def updateProbe( + appId: String, + attemptId: Option[String], + prevFileSize: Long)(): Boolean = { + lookup(appId, attemptId) match { + case None => + logDebug(s"Application Attempt $appId/$attemptId not found") + false + case Some(latest) => + prevFileSize < latest.fileSize + } + } } private[history] object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" } +/** + * Application attempt information. + * + * @param logPath path to the log file, or, for a legacy log, its directory + * @param name application name + * @param appId application ID + * @param attemptId optional attempt ID + * @param startTime start time (from playback) + * @param endTime end time (from playback). -1 if the application is incomplete. + * @param lastUpdated the modification time of the log file when this entry was built by replaying + * the history. + * @param sparkUser user running the application + * @param completed flag to indicate whether or not the application has completed. + * @param fileSize the size of the log file the last time the file was scanned for changes + */ private class FsApplicationAttemptInfo( val logPath: String, val name: String, @@ -579,10 +688,24 @@ private class FsApplicationAttemptInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = true) + completed: Boolean, + val fileSize: Long) extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed) + attemptId, startTime, endTime, lastUpdated, sparkUser, completed) { + /** extend the superclass string value with the extra attributes of this class */ + override def toString: String = { + s"FsApplicationAttemptInfo($name, $appId," + + s" ${super.toString}, source=$logPath, size=$fileSize" + } +} + +/** + * Application history information + * @param id application ID + * @param name application name + * @param attempts list of attempts, most recent first. + */ private class FsApplicationHistoryInfo( id: String, override val name: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index cab7faefe8..2fad1120cd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -30,7 +30,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean val allApps = parent.getApplicationList() - .filter(_.attempts.head.completed != requestedIncomplete) + .filter(_.completed != requestedIncomplete) val allAppsSize = allApps.size val providerConfig = parent.getProviderConfig() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 1f13d7db34..076bdc5c05 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -23,7 +23,6 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import scala.util.control.NonFatal -import com.google.common.cache._ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -31,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -50,31 +49,16 @@ class HistoryServer( securityManager: SecurityManager, port: Int) extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf) - with Logging with UIRoot { + with Logging with UIRoot with ApplicationCacheOperations { // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - private val appLoader = new CacheLoader[String, SparkUI] { - override def load(key: String): SparkUI = { - val parts = key.split("/") - require(parts.length == 1 || parts.length == 2, s"Invalid app key $key") - val ui = provider - .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None) - .getOrElse(throw new NoSuchElementException(s"no app with key $key")) - attachSparkUI(ui) - ui - } - } + // application + private val appCache = new ApplicationCache(this, retainedApplications, new SystemClock()) - private val appCache = CacheBuilder.newBuilder() - .maximumSize(retainedApplications) - .removalListener(new RemovalListener[String, SparkUI] { - override def onRemoval(rm: RemovalNotification[String, SparkUI]): Unit = { - detachSparkUI(rm.getValue()) - } - }) - .build(appLoader) + // and its metrics, for testing as well as monitoring + val cacheMetrics = appCache.metrics private val loaderServlet = new HttpServlet { protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { @@ -117,17 +101,7 @@ class HistoryServer( } def getSparkUI(appKey: String): Option[SparkUI] = { - try { - val ui = appCache.get(appKey) - Some(ui) - } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - None - - case cause: Exception => throw cause - } - } + appCache.getSparkUI(appKey) } initialize() @@ -160,22 +134,37 @@ class HistoryServer( override def stop() { super.stop() provider.stop() + appCache.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ - private def attachSparkUI(ui: SparkUI) { + override def attachSparkUI( + appId: String, + attemptId: Option[String], + ui: SparkUI, + completed: Boolean) { assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs") ui.getHandlers.foreach(attachHandler) addFilters(ui.getHandlers, conf) } /** Detach a reconstructed UI from this server. Only valid after bind(). */ - private def detachSparkUI(ui: SparkUI) { + override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) } /** + * Get the application UI and whether or not it is completed + * @param appId application ID + * @param attemptId attempt ID + * @return If found, the Spark UI and any history information to be used in the cache + */ + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { + provider.getAppUI(appId, attemptId) + } + + /** * Returns a list of available applications, in descending order according to their end time. * * @return List of all known applications. @@ -202,9 +191,15 @@ class HistoryServer( */ def getProviderConfig(): Map[String, String] = provider.getConfig() + /** + * Load an application UI and attach it to the web server. + * @param appId application ID + * @param attemptId optional attempt ID + * @return true if the application was found and loaded. + */ private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { try { - appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse("")) + appCache.get(appId, attemptId) true } catch { case NonFatal(e) => e.getCause() match { @@ -216,6 +211,17 @@ class HistoryServer( } } + /** + * String value for diagnostics. + * @return a multi-line description of the server state. + */ + override def toString: String = { + s""" + | History Server; + | provider = $provider + | cache = $appCache + """.stripMargin + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 01fee46e73..8354e2a611 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -224,6 +224,13 @@ private[spark] class EventLoggingListener( } } fileSystem.rename(new Path(logPath + IN_PROGRESS), target) + // touch file to ensure modtime is current across those filesystems where rename() + // does not set it, -and which support setTimes(); it's a no-op on most object stores + try { + fileSystem.setTimes(target, System.currentTimeMillis(), -1) + } catch { + case e: Exception => logDebug(s"failed to set time of $target", e) + } } } |