From a2c7dcf61f33fa1897c950d2d905651103c170ea Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 11 Feb 2016 21:37:53 -0600 Subject: [SPARK-7889][WEBUI] HistoryServer updates UI for incomplete apps When the HistoryServer is showing an incomplete app, it needs to check if there is a newer version of the app available. It does this by checking if a version of the app has been loaded with a larger *filesize*. If so, it detaches the current UI, attaches the new one, and redirects back to the same URL to show the new UI. https://issues.apache.org/jira/browse/SPARK-7889 Author: Steve Loughran Author: Imran Rashid Closes #11118 from squito/SPARK-7889-alternate. --- .../spark/deploy/history/ApplicationCache.scala | 665 +++++++++++++++++++++ .../history/ApplicationHistoryProvider.scala | 42 +- .../spark/deploy/history/FsHistoryProvider.scala | 149 ++++- .../apache/spark/deploy/history/HistoryPage.scala | 2 +- .../spark/deploy/history/HistoryServer.scala | 78 +-- .../spark/scheduler/EventLoggingListener.scala | 7 + .../deploy/history/ApplicationCacheSuite.scala | 488 +++++++++++++++ .../spark/deploy/history/HistoryServerSuite.scala | 224 ++++++- docs/monitoring.md | 70 ++- project/MimaExcludes.scala | 3 + 10 files changed, 1654 insertions(+), 74 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala new file mode 100644 index 0000000000..e2fda29044 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.NoSuchElementException +import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.codahale.metrics.{Counter, MetricRegistry, Timer} +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} +import org.eclipse.jetty.servlet.FilterHolder + +import org.apache.spark.Logging +import org.apache.spark.metrics.source.Source +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Clock + +/** + * Cache for applications. + * + * Completed applications are cached for as long as there is capacity for them. + * Incompleted applications have their update time checked on every + * retrieval; if the cached entry is out of date, it is refreshed. + * + * @note there must be only one instance of [[ApplicationCache]] in a + * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]] + * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs + * can probe the current cache to see if the attempts have changed. + * + * Creating multiple instances will break this routing. + * @param operations implementation of record access operations + * @param retainedApplications number of retained applications + * @param clock time source + */ +private[history] class ApplicationCache( + val operations: ApplicationCacheOperations, + val retainedApplications: Int, + val clock: Clock) extends Logging { + + /** + * Services the load request from the cache. + */ + private val appLoader = new CacheLoader[CacheKey, CacheEntry] { + + /** the cache key doesn't match a cached entry, or the entry is out-of-date, so load it. */ + override def load(key: CacheKey): CacheEntry = { + loadApplicationEntry(key.appId, key.attemptId) + } + + } + + /** + * Handler for callbacks from the cache of entry removal. + */ + private val removalListener = new RemovalListener[CacheKey, CacheEntry] { + + /** + * Removal event notifies the provider to detach the UI. + * @param rm removal notification + */ + override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = { + metrics.evictionCount.inc() + val key = rm.getKey + logDebug(s"Evicting entry ${key}") + operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui) + } + } + + /** + * The cache of applications. + * + * Tagged as `protected` so as to allow subclasses in tests to accesss it directly + */ + protected val appCache: LoadingCache[CacheKey, CacheEntry] = { + CacheBuilder.newBuilder() + .maximumSize(retainedApplications) + .removalListener(removalListener) + .build(appLoader) + } + + /** + * The metrics which are updated as the cache is used. + */ + val metrics = new CacheMetrics("history.cache") + + init() + + /** + * Perform any startup operations. + * + * This includes declaring this instance as the cache to use in the + * [[ApplicationCacheCheckFilterRelay]]. + */ + private def init(): Unit = { + ApplicationCacheCheckFilterRelay.setApplicationCache(this) + } + + /** + * Stop the cache. + * This will reset the relay in [[ApplicationCacheCheckFilterRelay]]. + */ + def stop(): Unit = { + ApplicationCacheCheckFilterRelay.resetApplicationCache() + } + + /** + * Get an entry. + * + * Cache fetch/refresh will have taken place by the time this method returns. + * @param appAndAttempt application to look up in the format needed by the history server web UI, + * `appId/attemptId` or `appId`. + * @return the entry + */ + def get(appAndAttempt: String): SparkUI = { + val parts = splitAppAndAttemptKey(appAndAttempt) + get(parts._1, parts._2) + } + + /** + * Get the Spark UI, converting a lookup failure from an exception to `None`. + * @param appAndAttempt application to look up in the format needed by the history server web UI, + * `appId/attemptId` or `appId`. + * @return the entry + */ + def getSparkUI(appAndAttempt: String): Option[SparkUI] = { + try { + val ui = get(appAndAttempt) + Some(ui) + } catch { + case NonFatal(e) => e.getCause() match { + case nsee: NoSuchElementException => + None + case cause: Exception => throw cause + } + } + } + + /** + * Get the associated spark UI. + * + * Cache fetch/refresh will have taken place by the time this method returns. + * @param appId application ID + * @param attemptId optional attempt ID + * @return the entry + */ + def get(appId: String, attemptId: Option[String]): SparkUI = { + lookupAndUpdate(appId, attemptId)._1.ui + } + + /** + * Look up the entry; update it if needed. + * @param appId application ID + * @param attemptId optional attempt ID + * @return the underlying cache entry -which can have its timestamp changed, and a flag to + * indicate that the entry has changed + */ + private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = { + metrics.lookupCount.inc() + val cacheKey = CacheKey(appId, attemptId) + var entry = appCache.getIfPresent(cacheKey) + var updated = false + if (entry == null) { + // no entry, so fetch without any post-fetch probes for out-of-dateness + // this will trigger a callback to loadApplicationEntry() + entry = appCache.get(cacheKey) + } else if (!entry.completed) { + val now = clock.getTimeMillis() + log.debug(s"Probing at time $now for updated application $cacheKey -> $entry") + metrics.updateProbeCount.inc() + updated = time(metrics.updateProbeTimer) { + entry.updateProbe() + } + if (updated) { + logDebug(s"refreshing $cacheKey") + metrics.updateTriggeredCount.inc() + appCache.refresh(cacheKey) + // and repeat the lookup + entry = appCache.get(cacheKey) + } else { + // update the probe timestamp to the current time + entry.probeTime = now + } + } + (entry, updated) + } + + /** + * This method is visible for testing. + * + * It looks up the cached entry *and returns a clone of it*. + * This ensures that the cached entries never leak + * @param appId application ID + * @param attemptId optional attempt ID + * @return a new entry with shared SparkUI, but copies of the other fields. + */ + def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = { + val entry = lookupAndUpdate(appId, attemptId)._1 + new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime) + } + + /** + * Probe for an application being updated. + * @param appId application ID + * @param attemptId attempt ID + * @return true if an update has been triggered + */ + def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = { + val (entry, updated) = lookupAndUpdate(appId, attemptId) + updated + } + + /** + * Size probe, primarily for testing. + * @return size + */ + def size(): Long = appCache.size() + + /** + * Emptiness predicate, primarily for testing. + * @return true if the cache is empty + */ + def isEmpty: Boolean = appCache.size() == 0 + + /** + * Time a closure, returning its output. + * @param t timer + * @param f function + * @tparam T type of return value of time + * @return the result of the function. + */ + private def time[T](t: Timer)(f: => T): T = { + val timeCtx = t.time() + try { + f + } finally { + timeCtx.close() + } + } + + /** + * Load the Spark UI via [[ApplicationCacheOperations.getAppUI()]], + * then attach it to the web UI via [[ApplicationCacheOperations.attachSparkUI()]]. + * + * If the application is incomplete, it has the [[ApplicationCacheCheckFilter]] + * added as a filter to the HTTP requests, so that queries on the UI will trigger + * update checks. + * + * The generated entry contains the UI and the current timestamp. + * The timer [[metrics.loadTimer]] tracks the time taken to load the UI. + * + * @param appId application ID + * @param attemptId optional attempt ID + * @return the cache entry + * @throws NoSuchElementException if there is no matching element + */ + @throws[NoSuchElementException] + def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = { + + logDebug(s"Loading application Entry $appId/$attemptId") + metrics.loadCount.inc() + time(metrics.loadTimer) { + operations.getAppUI(appId, attemptId) match { + case Some(LoadedAppUI(ui, updateState)) => + val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed) + if (completed) { + // completed spark UIs are attached directly + operations.attachSparkUI(appId, attemptId, ui, completed) + } else { + // incomplete UIs have the cache-check filter put in front of them. + ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId) + operations.attachSparkUI(appId, attemptId, ui, completed) + } + // build the cache entry + val now = clock.getTimeMillis() + val entry = new CacheEntry(ui, completed, updateState, now) + logDebug(s"Loaded application $appId/$attemptId -> $entry") + entry + case None => + metrics.lookupFailureCount.inc() + // guava's cache logs via java.util log, so is of limited use. Hence: our own message + logInfo(s"Failed to load application attempt $appId/$attemptId") + throw new NoSuchElementException(s"no application with application Id '$appId'" + + attemptId.map { id => s" attemptId '$id'" }.getOrElse(" and no attempt Id")) + } + } + } + + /** + * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces. + * + * @param appAndAttempt combined key + * @return a tuple of the application ID and, if present, the attemptID + */ + def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = { + val parts = appAndAttempt.split("/") + require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt") + val appId = parts(0) + val attemptId = if (parts.length > 1) Some(parts(1)) else None + (appId, attemptId) + } + + /** + * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`. + * + * If there is an `attemptId`; `applicationId` if not. + * @param appId application ID + * @param attemptId optional attempt ID + * @return a unified string + */ + def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = { + appId + attemptId.map { id => s"/$id" }.getOrElse("") + } + + /** + * String operator dumps the cache entries and metrics. + * @return a string value, primarily for testing and diagnostics + */ + override def toString: String = { + val sb = new StringBuilder(s"ApplicationCache(" + + s" retainedApplications= $retainedApplications)") + sb.append(s"; time= ${clock.getTimeMillis()}") + sb.append(s"; entry count= ${appCache.size()}\n") + sb.append("----\n") + appCache.asMap().asScala.foreach { + case(key, entry) => sb.append(s" $key -> $entry\n") + } + sb.append("----\n") + sb.append(metrics) + sb.append("----\n") + sb.toString() + } +} + +/** + * An entry in the cache. + * + * @param ui Spark UI + * @param completed Flag to indicated that the application has completed (and so + * does not need refreshing). + * @param updateProbe function to call to see if the application has been updated and + * therefore that the cached value needs to be refreshed. + * @param probeTime Times in milliseconds when the probe was last executed. + */ +private[history] final class CacheEntry( + val ui: SparkUI, + val completed: Boolean, + val updateProbe: () => Boolean, + var probeTime: Long) { + + /** string value is for test assertions */ + override def toString: String = { + s"UI $ui, completed=$completed, probeTime=$probeTime" + } +} + +/** + * Cache key: compares on `appId` and then, if non-empty, `attemptId`. + * The [[hashCode()]] function uses the same fields. + * @param appId application ID + * @param attemptId attempt ID + */ +private[history] final case class CacheKey(appId: String, attemptId: Option[String]) { + + override def toString: String = { + appId + attemptId.map { id => s"/$id" }.getOrElse("") + } +} + +/** + * Metrics of the cache + * @param prefix prefix to register all entries under + */ +private[history] class CacheMetrics(prefix: String) extends Source { + + /* metrics: counters and timers */ + val lookupCount = new Counter() + val lookupFailureCount = new Counter() + val evictionCount = new Counter() + val loadCount = new Counter() + val loadTimer = new Timer() + val updateProbeCount = new Counter() + val updateProbeTimer = new Timer() + val updateTriggeredCount = new Counter() + + /** all the counters: for registration and string conversion. */ + private val counters = Seq( + ("lookup.count", lookupCount), + ("lookup.failure.count", lookupFailureCount), + ("eviction.count", evictionCount), + ("load.count", loadCount), + ("update.probe.count", updateProbeCount), + ("update.triggered.count", updateTriggeredCount)) + + /** all metrics, including timers */ + private val allMetrics = counters ++ Seq( + ("load.timer", loadTimer), + ("update.probe.timer", updateProbeTimer)) + + /** + * Name of metric source + */ + override val sourceName = "ApplicationCache" + + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * Startup actions. + * This includes registering metrics with [[metricRegistry]] + */ + private def init(): Unit = { + allMetrics.foreach { case (name, metric) => + metricRegistry.register(MetricRegistry.name(prefix, name), metric) + } + } + + override def toString: String = { + val sb = new StringBuilder() + counters.foreach { case (name, counter) => + sb.append(name).append(" = ").append(counter.getCount).append('\n') + } + sb.toString() + } +} + +/** + * API for cache events. That is: loading an App UI; and for + * attaching/detaching the UI to and from the Web UI. + */ +private[history] trait ApplicationCacheOperations { + + /** + * Get the application UI and the probe neededed to see if it has been updated. + * @param appId application ID + * @param attemptId attempt ID + * @return If found, the Spark UI and any history information to be used in the cache + */ + def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] + + /** + * Attach a reconstructed UI. + * @param appId application ID + * @param attemptId attempt ID + * @param ui UI + * @param completed flag to indicate that the UI has completed + */ + def attachSparkUI( + appId: String, + attemptId: Option[String], + ui: SparkUI, + completed: Boolean): Unit + + /** + * Detach a Spark UI. + * + * @param ui Spark UI + */ + def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit + +} + +/** + * This is a servlet filter which intercepts HTTP requests on application UIs and + * triggers checks for updated data. + * + * If the application cache indicates that the application has been updated, + * the filter returns a 302 redirect to the caller, asking them to re-request the web + * page. + * + * Because the application cache will detach and then re-attach the UI, when the caller + * repeats that request, it will now pick up the newly-updated web application. + * + * This does require the caller to handle 302 requests. Because of the ambiguity + * in how POST and PUT operations are responded to (that is, should a 307 be + * processed directly), the filter does not filter those requests. + * As the current web UIs are read-only, this is not an issue. If it were ever to + * support more HTTP verbs, then some support may be required. Perhaps, rather + * than sending a redirect, simply updating the value so that the next + * request will pick it up. + * + * Implementation note: there's some abuse of a shared global entry here because + * the configuration data passed to the servlet is just a string:string map. + */ +private[history] class ApplicationCacheCheckFilter() extends Filter with Logging { + + import ApplicationCacheCheckFilterRelay._ + var appId: String = _ + var attemptId: Option[String] = _ + + /** + * Bind the app and attempt ID, throwing an exception if no application ID was provided. + * @param filterConfig configuration + */ + override def init(filterConfig: FilterConfig): Unit = { + + appId = Option(filterConfig.getInitParameter(APP_ID)) + .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID")) + attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID)) + logDebug(s"initializing filter $this") + } + + /** + * Filter the request. + * Either the caller is given a 302 redirect to the current URL, or the + * request is passed on to the SparkUI servlets. + * + * @param request HttpServletRequest + * @param response HttpServletResponse + * @param chain the rest of the request chain + */ + override def doFilter( + request: ServletRequest, + response: ServletResponse, + chain: FilterChain): Unit = { + + // nobody has ever implemented any other kind of servlet, yet + // this check is universal, just in case someone does exactly + // that on your classpath + if (!(request.isInstanceOf[HttpServletRequest])) { + throw new ServletException("This filter only works for HTTP/HTTPS") + } + val httpRequest = request.asInstanceOf[HttpServletRequest] + val httpResponse = response.asInstanceOf[HttpServletResponse] + val requestURI = httpRequest.getRequestURI + val operation = httpRequest.getMethod + + // if the request is for an attempt, check to see if it is in need of delete/refresh + // and have the cache update the UI if so + if (operation=="HEAD" || operation=="GET" + && checkForUpdates(requestURI, appId, attemptId)) { + // send a redirect back to the same location. This will be routed + // to the *new* UI + logInfo(s"Application Attempt $appId/$attemptId updated; refreshing") + val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("") + val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr) + httpResponse.sendRedirect(redirectUrl) + } else { + chain.doFilter(request, response) + } + } + + override def destroy(): Unit = { + } + + override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId" +} + +/** + * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache + * probes to the cache. + * + * This is an ugly workaround for the limitation of servlets and filters in the Java servlet + * API; they are still configured on the model of a list of classnames and configuration + * strings in a `web.xml` field, rather than a chain of instances wired up by hand or + * via an injection framework. There is no way to directly configure a servlet filter instance + * with a reference to the application cache which is must use: some global state is needed. + * + * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests + * to the singleton [[ApplicationCache]] + * + * The field `applicationCache` must be set for the filters to work - + * this is done during the construction of [[ApplicationCache]], which requires that there + * is only one cache serving requests through the WebUI. + * + * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic + * will break: filters may not find instances. Tests must not do that. + * + */ +private[history] object ApplicationCacheCheckFilterRelay extends Logging { + // name of the app ID entry in the filter configuration. Mandatory. + val APP_ID = "appId" + + // name of the attempt ID entry in the filter configuration. Optional. + val ATTEMPT_ID = "attemptId" + + // namer of the filter to register + val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter" + + /** the application cache to relay requests to */ + @volatile + private var applicationCache: Option[ApplicationCache] = None + + /** + * Set the application cache. Logs a warning if it is overwriting an existing value + * @param cache new cache + */ + def setApplicationCache(cache: ApplicationCache): Unit = { + applicationCache.foreach( c => logWarning(s"Overwriting application cache $c")) + applicationCache = Some(cache) + } + + /** + * Reset the application cache + */ + def resetApplicationCache(): Unit = { + applicationCache = None + } + + /** + * Check to see if there has been an update + * @param requestURI URI the request came in on + * @param appId application ID + * @param attemptId attempt ID + * @return true if an update was loaded for the app/attempt + */ + def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = { + + logDebug(s"Checking $appId/$attemptId from $requestURI") + applicationCache match { + case Some(cache) => + try { + cache.checkForUpdates(appId, attemptId) + } catch { + case ex: Exception => + // something went wrong. Keep going with the existing UI + logWarning(s"When checking for $appId/$attemptId from $requestURI", ex) + false + } + + case None => + logWarning("No application cache instance defined") + false + } + } + + + /** + * Register a filter for the web UI which checks for updates to the given app/attempt + * @param ui Spark UI to attach filters to + * @param appId application ID + * @param attemptId attempt ID + */ + def registerFilter( + ui: SparkUI, + appId: String, + attemptId: Option[String] ): Unit = { + require(ui != null) + val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST) + val holder = new FilterHolder() + holder.setClassName(FILTER_NAME) + holder.setInitParameter(APP_ID, appId) + attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id)) + require(ui.getHandlers != null, "null handlers") + ui.getHandlers.foreach { handler => + handler.addFilter(holder, "/*", enumDispatcher) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 5f5e0fe1c3..44661edfff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -33,7 +33,42 @@ private[spark] case class ApplicationAttemptInfo( private[spark] case class ApplicationHistoryInfo( id: String, name: String, - attempts: List[ApplicationAttemptInfo]) + attempts: List[ApplicationAttemptInfo]) { + + /** + * Has this application completed? + * @return true if the most recent attempt has completed + */ + def completed: Boolean = { + attempts.nonEmpty && attempts.head.completed + } +} + +/** + * A probe which can be invoked to see if a loaded Web UI has been updated. + * The probe is expected to be relative purely to that of the UI returned + * in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded, + * the probe returned with it is the one that must be used to check for it + * being out of date; previous probes must be discarded. + */ +private[history] abstract class HistoryUpdateProbe { + /** + * Return true if the history provider has a later version of the application + * attempt than the one against this probe was constructed. + * @return + */ + def isUpdated(): Boolean +} + +/** + * All the information returned from a call to `getAppUI()`: the new UI + * and any required update state. + * @param ui Spark UI + * @param updateProbe probe to call to check on the update state of this application attempt + */ +private[history] case class LoadedAppUI( + ui: SparkUI, + updateProbe: () => Boolean) private[history] abstract class ApplicationHistoryProvider { @@ -49,9 +84,10 @@ private[history] abstract class ApplicationHistoryProvider { * * @param appId The application ID. * @param attemptId The application attempt ID (or None if there is no attempt ID). - * @return The application's UI, or None if application is not found. + * @return a [[LoadedAppUI]] instance containing the application's UI and any state information + * for update probes, or `None` if the application/attempt is not found. */ - def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] + def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 9648959dba..f885798760 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.history -import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream} +import java.io.{FileNotFoundException, IOException, OutputStream} import java.util.UUID import java.util.concurrent.{Executors, ExecutorService, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -33,7 +33,6 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} @@ -42,6 +41,31 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * A class that provides application history from event logs stored in the file system. * This provider checks for new finished applications in the background periodically and * renders the history application UI by parsing the associated event logs. + * + * == How new and updated attempts are detected == + * + * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any + * entries in the log dir whose modification time is greater than the last scan time + * are considered new or updated. These are replayed to create a new [[FsApplicationAttemptInfo]] + * entry and update or create a matching [[FsApplicationHistoryInfo]] element in the list + * of applications. + * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the + * [[FsApplicationAttemptInfo]] is replaced by another one with a larger log size. + * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]] + * instance is out of date, the log size of the cached instance is checked against the app last + * loaded by [[checkForLogs]]. + * + * The use of log size, rather than simply relying on modification times, is needed to + * address the following issues + * - some filesystems do not appear to update the `modtime` value whenever data is flushed to + * an open file output stream. Changes to the history may not be picked up. + * - the granularity of the `modtime` field may be 2+ seconds. Rapid changes to the FS can be + * missed. + * + * Tracking filesize works given the following invariant: the logs get bigger + * as new events are added. If a format was used in which this did not hold, the mechanism would + * break. Simple streaming of JSON-formatted events, as is implemented today, implicitly + * maintains this invariant. */ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) extends ApplicationHistoryProvider with Logging { @@ -77,9 +101,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("spark-history-task-%d").setDaemon(true).build()) - // The modification time of the newest log detected during the last scan. This is used - // to ignore logs that are older during subsequent scans, to avoid processing data that - // is already known. + // The modification time of the newest log detected during the last scan. Currently only + // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private var lastScanTime = -1L // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted @@ -87,6 +110,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() + val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]() + // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] @@ -176,18 +201,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Disable the background thread during tests. if (!conf.contains("spark.testing")) { // A task that periodically checks for event log updates on disk. + logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds") pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } + } else { + logDebug("Background update thread disabled for testing") } } override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values - override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = { + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { try { applications.get(appId).flatMap { appInfo => appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => @@ -210,7 +238,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse("")) ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) - ui + LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)) } } } @@ -243,12 +271,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] def checkForLogs(): Unit = { try { val newLastScanTime = getNewLastScanTime() + logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) .getOrElse(Seq[FileStatus]()) + // scan for modified applications, replay and merge them val logInfos: Seq[FileStatus] = statusList .filter { entry => try { - !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime) + val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) + !entry.isDirectory() && prevFileSize < entry.getLen() } catch { case e: AccessControlException => // Do not use "logInfo" since these messages can get pretty noisy if printed on @@ -262,6 +293,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) entry1.getModificationTime() >= entry2.getModificationTime() } + if (logInfos.nonEmpty) { + logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") + } logInfos.grouped(20) .map { batch => replayExecutor.submit(new Runnable { @@ -356,7 +390,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val bus = new ReplayListenerBus() val res = replay(fileStatus, bus) res match { - case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.") + case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r") case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " + "The application may have not started.") } @@ -511,6 +545,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") + // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, + // and when we read the file here. That is OK -- it may result in an unnecessary refresh + // when there is no update, but will not result in missing an update. We *must* prevent + // an error the other way -- if we report a size bigger (ie later) than the file that is + // actually read, we may never refresh the app. FileStatus is guaranteed to be static + // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { val appListener = new ApplicationEventListener @@ -521,7 +561,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Without an app ID, new logs will render incorrectly in the listing page, so do not list or // try to show their UI. if (appListener.appId.isDefined) { - Some(new FsApplicationAttemptInfo( + val attemptInfo = new FsApplicationAttemptInfo( logPath.getName(), appListener.appName.getOrElse(NOT_STARTED), appListener.appId.getOrElse(logPath.getName()), @@ -530,7 +570,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appListener.endTime.getOrElse(-1L), eventLog.getModificationTime(), appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted)) + appCompleted, + eventLog.getLen() + ) + fileToAppInfo(logPath) = attemptInfo + Some(attemptInfo) } else { None } @@ -564,12 +608,77 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET) } + /** + * String description for diagnostics + * @return a summary of the component state + */ + override def toString: String = { + val header = s""" + | FsHistoryProvider: logdir=$logDir, + | last scan time=$lastScanTime + | Cached application count =${applications.size}} + """.stripMargin + val sb = new StringBuilder(header) + applications.foreach(entry => sb.append(entry._2).append("\n")) + sb.toString + } + + /** + * Look up an application attempt + * @param appId application ID + * @param attemptId Attempt ID, if set + * @return the matching attempt, if found + */ + def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = { + applications.get(appId).flatMap { appInfo => + appInfo.attempts.find(_.attemptId == attemptId) + } + } + + /** + * Return true iff a newer version of the UI is available. The check is based on whether the + * fileSize for the currently loaded UI is smaller than the file size the last time + * the logs were loaded. + * + * This is a very cheap operation -- the work of loading the new attempt was already done + * by [[checkForLogs]]. + * @param appId application to probe + * @param attemptId attempt to probe + * @param prevFileSize the file size of the logs for the currently displayed UI + */ + private def updateProbe( + appId: String, + attemptId: Option[String], + prevFileSize: Long)(): Boolean = { + lookup(appId, attemptId) match { + case None => + logDebug(s"Application Attempt $appId/$attemptId not found") + false + case Some(latest) => + prevFileSize < latest.fileSize + } + } } private[history] object FsHistoryProvider { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" } +/** + * Application attempt information. + * + * @param logPath path to the log file, or, for a legacy log, its directory + * @param name application name + * @param appId application ID + * @param attemptId optional attempt ID + * @param startTime start time (from playback) + * @param endTime end time (from playback). -1 if the application is incomplete. + * @param lastUpdated the modification time of the log file when this entry was built by replaying + * the history. + * @param sparkUser user running the application + * @param completed flag to indicate whether or not the application has completed. + * @param fileSize the size of the log file the last time the file was scanned for changes + */ private class FsApplicationAttemptInfo( val logPath: String, val name: String, @@ -579,10 +688,24 @@ private class FsApplicationAttemptInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - completed: Boolean = true) + completed: Boolean, + val fileSize: Long) extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed) + attemptId, startTime, endTime, lastUpdated, sparkUser, completed) { + /** extend the superclass string value with the extra attributes of this class */ + override def toString: String = { + s"FsApplicationAttemptInfo($name, $appId," + + s" ${super.toString}, source=$logPath, size=$fileSize" + } +} + +/** + * Application history information + * @param id application ID + * @param name application name + * @param attempts list of attempts, most recent first. + */ private class FsApplicationHistoryInfo( id: String, override val name: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index cab7faefe8..2fad1120cd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -30,7 +30,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean val allApps = parent.getApplicationList() - .filter(_.attempts.head.completed != requestedIncomplete) + .filter(_.completed != requestedIncomplete) val allAppsSize = allApps.size val providerConfig = parent.getProviderConfig() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 1f13d7db34..076bdc5c05 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -23,7 +23,6 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import scala.util.control.NonFatal -import com.google.common.cache._ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -31,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -50,31 +49,16 @@ class HistoryServer( securityManager: SecurityManager, port: Int) extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf) - with Logging with UIRoot { + with Logging with UIRoot with ApplicationCacheOperations { // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - private val appLoader = new CacheLoader[String, SparkUI] { - override def load(key: String): SparkUI = { - val parts = key.split("/") - require(parts.length == 1 || parts.length == 2, s"Invalid app key $key") - val ui = provider - .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None) - .getOrElse(throw new NoSuchElementException(s"no app with key $key")) - attachSparkUI(ui) - ui - } - } + // application + private val appCache = new ApplicationCache(this, retainedApplications, new SystemClock()) - private val appCache = CacheBuilder.newBuilder() - .maximumSize(retainedApplications) - .removalListener(new RemovalListener[String, SparkUI] { - override def onRemoval(rm: RemovalNotification[String, SparkUI]): Unit = { - detachSparkUI(rm.getValue()) - } - }) - .build(appLoader) + // and its metrics, for testing as well as monitoring + val cacheMetrics = appCache.metrics private val loaderServlet = new HttpServlet { protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { @@ -117,17 +101,7 @@ class HistoryServer( } def getSparkUI(appKey: String): Option[SparkUI] = { - try { - val ui = appCache.get(appKey) - Some(ui) - } catch { - case NonFatal(e) => e.getCause() match { - case nsee: NoSuchElementException => - None - - case cause: Exception => throw cause - } - } + appCache.getSparkUI(appKey) } initialize() @@ -160,21 +134,36 @@ class HistoryServer( override def stop() { super.stop() provider.stop() + appCache.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ - private def attachSparkUI(ui: SparkUI) { + override def attachSparkUI( + appId: String, + attemptId: Option[String], + ui: SparkUI, + completed: Boolean) { assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs") ui.getHandlers.foreach(attachHandler) addFilters(ui.getHandlers, conf) } /** Detach a reconstructed UI from this server. Only valid after bind(). */ - private def detachSparkUI(ui: SparkUI) { + override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs") ui.getHandlers.foreach(detachHandler) } + /** + * Get the application UI and whether or not it is completed + * @param appId application ID + * @param attemptId attempt ID + * @return If found, the Spark UI and any history information to be used in the cache + */ + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { + provider.getAppUI(appId, attemptId) + } + /** * Returns a list of available applications, in descending order according to their end time. * @@ -202,9 +191,15 @@ class HistoryServer( */ def getProviderConfig(): Map[String, String] = provider.getConfig() + /** + * Load an application UI and attach it to the web server. + * @param appId application ID + * @param attemptId optional attempt ID + * @return true if the application was found and loaded. + */ private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = { try { - appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse("")) + appCache.get(appId, attemptId) true } catch { case NonFatal(e) => e.getCause() match { @@ -216,6 +211,17 @@ class HistoryServer( } } + /** + * String value for diagnostics. + * @return a multi-line description of the server state. + */ + override def toString: String = { + s""" + | History Server; + | provider = $provider + | cache = $appCache + """.stripMargin + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 01fee46e73..8354e2a611 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -224,6 +224,13 @@ private[spark] class EventLoggingListener( } } fileSystem.rename(new Path(logPath + IN_PROGRESS), target) + // touch file to ensure modtime is current across those filesystems where rename() + // does not set it, -and which support setTimes(); it's a no-op on most object stores + try { + fileSystem.setTimes(target, System.currentTimeMillis(), -1) + } catch { + case e: Exception => logDebug(s"failed to set time of $target", e) + } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala new file mode 100644 index 0000000000..de6680c610 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.{Date, NoSuchElementException} +import javax.servlet.Filter +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer +import scala.language.postfixOps + +import com.codahale.metrics.Counter +import com.google.common.cache.LoadingCache +import com.google.common.util.concurrent.UncheckedExecutionException +import org.eclipse.jetty.servlet.ServletContextHandler +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.Matchers +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{Logging, SparkFunSuite} +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo} +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.{Clock, ManualClock, Utils} + +class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers { + + /** + * subclass with access to the cache internals + * @param retainedApplications number of retained applications + */ + class TestApplicationCache( + operations: ApplicationCacheOperations = new StubCacheOperations(), + retainedApplications: Int, + clock: Clock = new ManualClock(0)) + extends ApplicationCache(operations, retainedApplications, clock) { + + def cache(): LoadingCache[CacheKey, CacheEntry] = appCache + } + + /** + * Stub cache operations. + * The state is kept in a map of [[CacheKey]] to [[CacheEntry]], + * the `probeTime` field in the cache entry setting the timestamp of the entry + */ + class StubCacheOperations extends ApplicationCacheOperations with Logging { + + /** map to UI instances, including timestamps, which are used in update probes */ + val instances = mutable.HashMap.empty[CacheKey, CacheEntry] + + /** Map of attached spark UIs */ + val attached = mutable.HashMap.empty[CacheKey, SparkUI] + + var getAppUICount = 0L + var attachCount = 0L + var detachCount = 0L + var updateProbeCount = 0L + + override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { + logDebug(s"getAppUI($appId, $attemptId)") + getAppUICount += 1 + instances.get(CacheKey(appId, attemptId)).map( e => + LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime))) + } + + override def attachSparkUI( + appId: String, + attemptId: Option[String], + ui: SparkUI, + completed: Boolean): Unit = { + logDebug(s"attachSparkUI($appId, $attemptId, $ui)") + attachCount += 1 + attached += (CacheKey(appId, attemptId) -> ui) + } + + def putAndAttach( + appId: String, + attemptId: Option[String], + completed: Boolean, + started: Long, + ended: Long, + timestamp: Long): SparkUI = { + val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp) + attachSparkUI(appId, attemptId, ui, completed) + ui + } + + def putAppUI( + appId: String, + attemptId: Option[String], + completed: Boolean, + started: Long, + ended: Long, + timestamp: Long): SparkUI = { + val ui = newUI(appId, attemptId, completed, started, ended) + putInstance(appId, attemptId, ui, completed, timestamp) + ui + } + + def putInstance( + appId: String, + attemptId: Option[String], + ui: SparkUI, + completed: Boolean, + timestamp: Long): Unit = { + instances += (CacheKey(appId, attemptId) -> + new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp)) + } + + /** + * Detach a reconstructed UI + * + * @param ui Spark UI + */ + override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = { + logDebug(s"detachSparkUI($appId, $attemptId, $ui)") + detachCount += 1 + var name = ui.getAppName + val key = CacheKey(appId, attemptId) + attached.getOrElse(key, { throw new java.util.NoSuchElementException() }) + attached -= key + } + + /** + * Lookup from the internal cache of attached UIs + */ + def getAttached(appId: String, attemptId: Option[String]): Option[SparkUI] = { + attached.get(CacheKey(appId, attemptId)) + } + + /** + * The update probe. + * @param appId application to probe + * @param attemptId attempt to probe + * @param updateTime timestamp of this UI load + */ + private[history] def updateProbe( + appId: String, + attemptId: Option[String], + updateTime: Long)(): Boolean = { + updateProbeCount += 1 + logDebug(s"isUpdated($appId, $attemptId, ${updateTime})") + val entry = instances.get(CacheKey(appId, attemptId)).get + val updated = entry.probeTime > updateTime + logDebug(s"entry = $entry; updated = $updated") + updated + } + } + + /** + * Create a new UI. The info/attempt info classes here are from the package + * `org.apache.spark.status.api.v1`, not the near-equivalents from the history package + */ + def newUI( + name: String, + attemptId: Option[String], + completed: Boolean, + started: Long, + ended: Long): SparkUI = { + val info = new ApplicationInfo(name, name, Some(1), Some(1), Some(1), Some(64), + Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended), + new Date(ended), ended - started, "user", completed))) + val ui = mock[SparkUI] + when(ui.getApplicationInfoList).thenReturn(List(info).iterator) + when(ui.getAppName).thenReturn(name) + when(ui.appName).thenReturn(name) + val handler = new ServletContextHandler() + when(ui.getHandlers).thenReturn(Seq(handler)) + ui + } + + /** + * Test operations on completed UIs: they are loaded on demand, entries + * are removed on overload. + * + * This effectively tests the original behavior of the history server's cache. + */ + test("Completed UI get") { + val operations = new StubCacheOperations() + val clock = new ManualClock(1) + implicit val cache = new ApplicationCache(operations, 2, clock) + val metrics = cache.metrics + // cache misses + val app1 = "app-1" + assertNotFound(app1, None) + assertMetric("lookupCount", metrics.lookupCount, 1) + assertMetric("lookupFailureCount", metrics.lookupFailureCount, 1) + assert(1 === operations.getAppUICount, "getAppUICount") + assertNotFound(app1, None) + assert(2 === operations.getAppUICount, "getAppUICount") + assert(0 === operations.attachCount, "attachCount") + + val now = clock.getTimeMillis() + // add the entry + operations.putAppUI(app1, None, true, now, now, now) + + // make sure its local + operations.getAppUI(app1, None).get + operations.getAppUICount = 0 + // now expect it to be found + val cacheEntry = cache.lookupCacheEntry(app1, None) + assert(1 === cacheEntry.probeTime) + assert(cacheEntry.completed) + // assert about queries made of the opereations + assert(1 === operations.getAppUICount, "getAppUICount") + assert(1 === operations.attachCount, "attachCount") + + // and in the map of attached + assert(operations.getAttached(app1, None).isDefined, s"attached entry '1' from $cache") + + // go forward in time + clock.setTime(10) + val time2 = clock.getTimeMillis() + val cacheEntry2 = cache.get(app1) + // no more refresh as this is a completed app + assert(1 === operations.getAppUICount, "getAppUICount") + assert(0 === operations.updateProbeCount, "updateProbeCount") + assert(0 === operations.detachCount, "attachCount") + + // evict the entry + operations.putAndAttach("2", None, true, time2, time2, time2) + operations.putAndAttach("3", None, true, time2, time2, time2) + cache.get("2") + cache.get("3") + + // there should have been a detachment here + assert(1 === operations.detachCount, s"detach count from $cache") + // and entry app1 no longer attached + assert(operations.getAttached(app1, None).isEmpty, s"get($app1) in $cache") + val appId = "app1" + val attemptId = Some("_01") + val time3 = clock.getTimeMillis() + operations.putAppUI(appId, attemptId, false, time3, 0, time3) + // expect an error here + assertNotFound(appId, None) + } + + test("Test that if an attempt ID is is set, it must be used in lookups") { + val operations = new StubCacheOperations() + val clock = new ManualClock(1) + implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock) + val appId = "app1" + val attemptId = Some("_01") + operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0) + assertNotFound(appId, None) + } + + /** + * Test that incomplete apps are not probed for updates during the time window, + * but that they are checked if that window has expired and they are not completed. + * Then, if they have changed, the old entry is replaced by a new one. + */ + test("Incomplete apps refreshed") { + val operations = new StubCacheOperations() + val clock = new ManualClock(50) + val window = 500 + implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock) + val metrics = cache.metrics + // add the incomplete app + // add the entry + val started = clock.getTimeMillis() + val appId = "app1" + val attemptId = Some("001") + operations.putAppUI(appId, attemptId, false, started, 0, started) + val firstEntry = cache.lookupCacheEntry(appId, attemptId) + assert(started === firstEntry.probeTime, s"timestamp in $firstEntry") + assert(!firstEntry.completed, s"entry is complete: $firstEntry") + assertMetric("lookupCount", metrics.lookupCount, 1) + + assert(0 === operations.updateProbeCount, "expected no update probe on that first get") + + val checkTime = window * 2 + clock.setTime(checkTime) + val entry3 = cache.lookupCacheEntry(appId, attemptId) + assert(firstEntry !== entry3, s"updated entry test from $cache") + assertMetric("lookupCount", metrics.lookupCount, 2) + assertMetric("updateProbeCount", metrics.updateProbeCount, 1) + assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0) + assert(1 === operations.updateProbeCount, s"refresh count in $cache") + assert(0 === operations.detachCount, s"detach count") + assert(entry3.probeTime === checkTime) + + val updateTime = window * 3 + // update the cached value + val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime) + val endTime = window * 10 + clock.setTime(endTime) + logDebug(s"Before operation = $cache") + val entry5 = cache.lookupCacheEntry(appId, attemptId) + assertMetric("lookupCount", metrics.lookupCount, 3) + assertMetric("updateProbeCount", metrics.updateProbeCount, 2) + // the update was triggered + assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1) + assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache") + + // at which point, the refreshes stop + clock.setTime(window * 20) + assertCacheEntryEquals(appId, attemptId, entry5) + assertMetric("updateProbeCount", metrics.updateProbeCount, 2) + } + + /** + * Assert that a metric counter has a specific value; failure raises an exception + * including the cache's toString value + * @param name counter name (for exceptions) + * @param counter counter + * @param expected expected value. + * @param cache cache + */ + def assertMetric( + name: String, + counter: Counter, + expected: Long) + (implicit cache: ApplicationCache): Unit = { + val actual = counter.getCount + if (actual != expected) { + // this is here because Scalatest loses stack depth + throw new Exception(s"Wrong $name value - expected $expected but got $actual in $cache") + } + } + + /** + * Look up the cache entry and assert that it maches in the expected value. + * This assertion works if the two CacheEntries are different -it looks at the fields. + * UI are compared on object equality; the timestamp and completed flags directly. + * @param appId application ID + * @param attemptId attempt ID + * @param expected expected value + * @param cache app cache + */ + def assertCacheEntryEquals( + appId: String, + attemptId: Option[String], + expected: CacheEntry) + (implicit cache: ApplicationCache): Unit = { + val actual = cache.lookupCacheEntry(appId, attemptId) + val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache" + assert(expected.ui === actual.ui, errorText + " SparkUI reference") + assert(expected.completed === actual.completed, errorText + " -completed flag") + assert(expected.probeTime === actual.probeTime, errorText + " -timestamp") + } + + /** + * Assert that a key wasn't found in cache or loaded. + * + * Looks for the specific nested exception raised by [[ApplicationCache]] + * @param appId application ID + * @param attemptId attempt ID + * @param cache app cache + */ + def assertNotFound( + appId: String, + attemptId: Option[String]) + (implicit cache: ApplicationCache): Unit = { + val ex = intercept[UncheckedExecutionException] { + cache.get(appId, attemptId) + } + var cause = ex.getCause + assert(cause !== null) + if (!cause.isInstanceOf[NoSuchElementException]) { + throw cause + } + } + + test("Large Scale Application Eviction") { + val operations = new StubCacheOperations() + val clock = new ManualClock(0) + val size = 5 + // only two entries are retained, so we expect evictions to occurr on lookups + implicit val cache: ApplicationCache = new TestApplicationCache(operations, + retainedApplications = size, clock = clock) + + val attempt1 = Some("01") + + val ids = new ListBuffer[String]() + // build a list of applications + val count = 100 + for (i <- 1 to count ) { + val appId = f"app-$i%04d" + ids += appId + clock.advance(10) + val t = clock.getTimeMillis() + operations.putAppUI(appId, attempt1, true, t, t, t) + } + // now go through them in sequence reading them, expect evictions + ids.foreach { id => + cache.get(id, attempt1) + } + logInfo(cache.toString) + val metrics = cache.metrics + + assertMetric("loadCount", metrics.loadCount, count) + assertMetric("evictionCount", metrics.evictionCount, count - size) +} + + test("Attempts are Evicted") { + val operations = new StubCacheOperations() + implicit val cache: ApplicationCache = new TestApplicationCache(operations, + retainedApplications = 4) + val metrics = cache.metrics + val appId = "app1" + val attempt1 = Some("01") + val attempt2 = Some("02") + val attempt3 = Some("03") + operations.putAppUI(appId, attempt1, true, 100, 110, 110) + operations.putAppUI(appId, attempt2, true, 200, 210, 210) + operations.putAppUI(appId, attempt3, true, 300, 310, 310) + val attempt4 = Some("04") + operations.putAppUI(appId, attempt4, true, 400, 410, 410) + val attempt5 = Some("05") + operations.putAppUI(appId, attempt5, true, 500, 510, 510) + + def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = { + assertMetric("loadCount", metrics.loadCount, expectedLoad) + assertMetric("evictionCount", metrics.evictionCount, expectedEvictionCount) + } + + // first entry + cache.get(appId, attempt1) + expectLoadAndEvictionCounts(1, 0) + + // second + cache.get(appId, attempt2) + expectLoadAndEvictionCounts(2, 0) + + // no change + cache.get(appId, attempt2) + expectLoadAndEvictionCounts(2, 0) + + // eviction time + cache.get(appId, attempt3) + cache.size() should be(3) + cache.get(appId, attempt4) + expectLoadAndEvictionCounts(4, 0) + cache.get(appId, attempt5) + expectLoadAndEvictionCounts(5, 1) + cache.get(appId, attempt5) + expectLoadAndEvictionCounts(5, 1) + + } + + test("Instantiate Filter") { + // this is a regression test on the filter being constructable + val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) + val instance = clazz.newInstance() + instance shouldBe a [Filter] + } + + test("redirect includes query params") { + val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME) + val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter] + filter.appId = "local-123" + val cache = mock[ApplicationCache] + when(cache.checkForUpdates(any(), any())).thenReturn(true) + ApplicationCacheCheckFilterRelay.setApplicationCache(cache) + val request = mock[HttpServletRequest] + when(request.getMethod()).thenReturn("GET") + when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/") + when(request.getQueryString()).thenReturn("id=2") + val resp = mock[HttpServletResponse] + when(resp.encodeRedirectURL(any())).thenAnswer(new Answer[String](){ + override def answer(invocationOnMock: InvocationOnMock): String = { + invocationOnMock.getArguments()(0).asInstanceOf[String] + } + }) + filter.doFilter(request, resp, null) + verify(resp).sendRedirect("http://localhost:18080/history/local-123/jobs/job/?id=2") + } + +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 40d0076eec..4b05469c42 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -21,16 +21,28 @@ import java.net.{HttpURLConnection, URL} import java.util.zip.ZipInputStream import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import scala.concurrent.duration._ +import scala.language.postfixOps + +import com.codahale.metrics.Counter import com.google.common.base.Charsets import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.{FileUtils, IOUtils} -import org.mockito.Mockito.when +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.JsonAST._ +import org.json4s.jackson.JsonMethods +import org.json4s.jackson.JsonMethods._ +import org.openqa.selenium.WebDriver +import org.openqa.selenium.htmlunit.HtmlUnitDriver import org.scalatest.{BeforeAndAfter, Matchers} +import org.scalatest.concurrent.Eventually import org.scalatest.mock.MockitoSugar +import org.scalatest.selenium.WebBrowser -import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.ui.{SparkUI, UIUtils} -import org.apache.spark.util.ResetSystemProperties +import org.apache.spark._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.jobs.UIData.JobUIData +import org.apache.spark.util.{ResetSystemProperties, Utils} /** * A collection of tests against the historyserver, including comparing responses from the json @@ -44,7 +56,8 @@ import org.apache.spark.util.ResetSystemProperties * are considered part of Spark's public api. */ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers with MockitoSugar - with JsonTestUtils with ResetSystemProperties { + with JsonTestUtils with Eventually with WebBrowser with LocalSparkContext + with ResetSystemProperties { private val logDir = new File("src/test/resources/spark-events") private val expRoot = new File("src/test/resources/HistoryServerExpectations/") @@ -56,7 +69,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers def init(): Unit = { val conf = new SparkConf() .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) - .set("spark.history.fs.updateInterval", "0") + .set("spark.history.fs.update.interval", "0") .set("spark.testing", "true") provider = new FsHistoryProvider(conf) provider.checkForLogs() @@ -256,6 +269,204 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers all (siteRelativeLinks) should startWith (uiRoot) } + test("incomplete apps get refreshed") { + + implicit val webDriver: WebDriver = new HtmlUnitDriver + implicit val formats = org.json4s.DefaultFormats + + // this test dir is explictly deleted on successful runs; retained for diagnostics when + // not + val logDir = Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs")) + + // a new conf is used with the background thread set and running at its fastest + // alllowed refresh rate (1Hz) + val myConf = new SparkConf() + .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) + .set("spark.eventLog.dir", logDir.getAbsolutePath) + .set("spark.history.fs.update.interval", "1s") + .set("spark.eventLog.enabled", "true") + .set("spark.history.cache.window", "250ms") + .remove("spark.testing") + val provider = new FsHistoryProvider(myConf) + val securityManager = new SecurityManager(myConf) + + sc = new SparkContext("local", "test", myConf) + val logDirUri = logDir.toURI + val logDirPath = new Path(logDirUri) + val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration) + + def listDir(dir: Path): Seq[FileStatus] = { + val statuses = fs.listStatus(dir) + statuses.flatMap( + stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat)) + } + + def dumpLogDir(msg: String = ""): Unit = { + if (log.isDebugEnabled) { + logDebug(msg) + listDir(logDirPath).foreach { status => + val s = status.toString + logDebug(s) + } + } + } + + // stop the server with the old config, and start the new one + server.stop() + server = new HistoryServer(myConf, provider, securityManager, 18080) + server.initialize() + server.bind() + val port = server.boundPort + val metrics = server.cacheMetrics + + // assert that a metric has a value; if not dump the whole metrics instance + def assertMetric(name: String, counter: Counter, expected: Long): Unit = { + val actual = counter.getCount + if (actual != expected) { + // this is here because Scalatest loses stack depth + fail(s"Wrong $name value - expected $expected but got $actual" + + s" in metrics\n$metrics") + } + } + + // build a URL for an app or app/attempt plus a page underneath + def buildURL(appId: String, suffix: String): URL = { + new URL(s"http://localhost:$port/history/$appId$suffix") + } + + // build a rest URL for the application and suffix. + def applications(appId: String, suffix: String): URL = { + new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix") + } + + val historyServerRoot = new URL(s"http://localhost:$port/") + + // start initial job + val d = sc.parallelize(1 to 10) + d.count() + val stdInterval = interval(100 milliseconds) + val appId = eventually(timeout(20 seconds), stdInterval) { + val json = getContentAndCode("applications", port)._2.get + val apps = parse(json).asInstanceOf[JArray].arr + apps should have size 1 + (apps.head \ "id").extract[String] + } + + val appIdRoot = buildURL(appId, "") + val rootAppPage = HistoryServerSuite.getUrl(appIdRoot) + logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage") + // sanity check to make sure filter is chaining calls + rootAppPage should not be empty + + def getAppUI: SparkUI = { + provider.getAppUI(appId, None).get.ui + } + + // selenium isn't that useful on failures...add our own reporting + def getNumJobs(suffix: String): Int = { + val target = buildURL(appId, suffix) + val targetBody = HistoryServerSuite.getUrl(target) + try { + go to target.toExternalForm + findAll(cssSelector("tbody tr")).toIndexedSeq.size + } catch { + case ex: Exception => + throw new Exception(s"Against $target\n$targetBody", ex) + } + } + // use REST API to get #of jobs + def getNumJobsRestful(): Int = { + val json = HistoryServerSuite.getUrl(applications(appId, "/jobs")) + val jsonAst = parse(json) + val jobList = jsonAst.asInstanceOf[JArray] + jobList.values.size + } + + // get a list of app Ids of all apps in a given state. REST API + def listApplications(completed: Boolean): Seq[String] = { + val json = parse(HistoryServerSuite.getUrl(applications("", ""))) + logDebug(s"${JsonMethods.pretty(json)}") + json match { + case JNothing => Seq() + case apps: JArray => + apps.filter(app => { + (app \ "attempts") match { + case attempts: JArray => + val state = (attempts.children.head \ "completed").asInstanceOf[JBool] + state.value == completed + case _ => false + } + }).map(app => (app \ "id").asInstanceOf[JString].values) + case _ => Seq() + } + } + + def completedJobs(): Seq[JobUIData] = { + getAppUI.jobProgressListener.completedJobs + } + + def activeJobs(): Seq[JobUIData] = { + getAppUI.jobProgressListener.activeJobs.values.toSeq + } + + activeJobs() should have size 0 + completedJobs() should have size 1 + getNumJobs("") should be (1) + getNumJobs("/jobs") should be (1) + getNumJobsRestful() should be (1) + assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics") + + // dump state before the next bit of test, which is where update + // checking really gets stressed + dumpLogDir("filesystem before executing second job") + logDebug(s"History Server: $server") + + val d2 = sc.parallelize(1 to 10) + d2.count() + dumpLogDir("After second job") + + val stdTimeout = timeout(10 seconds) + logDebug("waiting for UI to update") + eventually(stdTimeout, stdInterval) { + assert(2 === getNumJobs(""), + s"jobs not updated, server=$server\n dir = ${listDir(logDirPath)}") + assert(2 === getNumJobs("/jobs"), + s"job count under /jobs not updated, server=$server\n dir = ${listDir(logDirPath)}") + getNumJobsRestful() should be(2) + } + + d.count() + d.count() + eventually(stdTimeout, stdInterval) { + assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n") + } + val jobcount = getNumJobs("/jobs") + assert(!provider.getListing().head.completed) + + listApplications(false) should contain(appId) + + // stop the spark context + resetSparkContext() + // check the app is now found as completed + eventually(stdTimeout, stdInterval) { + assert(provider.getListing().head.completed, + s"application never completed, server=$server\n") + } + + // app becomes observably complete + eventually(stdTimeout, stdInterval) { + listApplications(true) should contain (appId) + } + // app is no longer incomplete + listApplications(false) should not contain(appId) + + assert(jobcount === getNumJobs("/jobs")) + + // no need to retain the test dir now the tests complete + logDir.deleteOnExit(); + + } + def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = { HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path")) } @@ -275,6 +486,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers out.write(json) out.close() } + } object HistoryServerSuite { diff --git a/docs/monitoring.md b/docs/monitoring.md index cedceb2958..c37f6fb20d 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -38,11 +38,25 @@ You can start the history server by executing: ./sbin/start-history-server.sh -When using the file-system provider class (see spark.history.provider below), the base logging -directory must be supplied in the spark.history.fs.logDirectory configuration option, -and should contain sub-directories that each represents an application's event logs. This creates a -web interface at `http://:18080` by default. The history server can be configured as -follows: +This creates a web interface at `http://:18080` by default, listing incomplete +and completed applications and attempts, and allowing them to be viewed + +When using the file-system provider class (see `spark.history.provider` below), the base logging +directory must be supplied in the `spark.history.fs.logDirectory` configuration option, +and should contain sub-directories that each represents an application's event logs. + +The spark jobs themselves must be configured to log events, and to log them to the same shared, +writeable directory. For example, if the server was configured with a log directory of +`hdfs://namenode/shared/spark-logs`, then the client-side options would be: + +``` +spark.eventLog.enabled true +spark.eventLog.dir hdfs://namenode/shared/spark-logs +``` + +The history server can be configured as follows: + +### Environment Variables @@ -69,11 +83,13 @@ follows:
Environment VariableMeaning
+### Spark configuration options + - + @@ -82,15 +98,21 @@ follows: @@ -112,7 +134,7 @@ follows:
Property NameDefaultMeaning
spark.history.providerorg.apache.spark.deploy.history.FsHistoryProviderorg.apache.spark.deploy.history.FsHistoryProvider Name of the class implementing the application history backend. Currently there is only one implementation, provided by Spark, which looks for application logs stored in the file system.spark.history.fs.logDirectory file:/tmp/spark-events - Directory that contains application event logs to be loaded by the history server + For the filesystem history provider, the URL to the directory containing application event + logs to load. This can be a local file:// path, + an HDFS path hdfs://namenode/shared/spark-logs + or that of an alternative filesystem supported by the Hadoop APIs.
spark.history.fs.update.interval 10s - The period at which information displayed by this history server is updated. - Each update checks for any changes made to the event logs in persisted storage. + The period at which the the filesystem history provider checks for new or + updated logs in the log directory. A shorter interval detects new applications faster, + at the expense of more server load re-reading updated applications. + As soon as an update has completed, listings of the completed and incomplete applications + will reflect the changes.
spark.history.kerberos.enabled false - Indicates whether the history server should use kerberos to login. This is useful + Indicates whether the history server should use kerberos to login. This is required if the history server is accessing HDFS files on a secure Hadoop cluster. If this is true, it uses the configs spark.history.kerberos.principal and spark.history.kerberos.keytab. @@ -156,15 +178,15 @@ follows: spark.history.fs.cleaner.interval 1d - How often the job history cleaner checks for files to delete. - Files are only deleted if they are older than spark.history.fs.cleaner.maxAge. + How often the filesystem job history cleaner checks for files to delete. + Files are only deleted if they are older than spark.history.fs.cleaner.maxAge
spark.history.fs.cleaner.maxAge 7d - Job history files older than this will be deleted when the history cleaner runs. + Job history files older than this will be deleted when the filesystem history cleaner runs.
@@ -172,7 +194,25 @@ follows: Note that in all of these UIs, the tables are sortable by clicking their headers, making it easy to identify slow tasks, data skew, etc. -Note that the history server only displays completed Spark jobs. One way to signal the completion of a Spark job is to stop the Spark Context explicitly (`sc.stop()`), or in Python using the `with SparkContext() as sc:` to handle the Spark Context setup and tear down, and still show the job history on the UI. +Note + +1. The history server displays both completed and incomplete Spark jobs. If an application makes +multiple attempts after failures, the failed attempts will be displayed, as well as any ongoing +incomplete attempt or the final successful attempt. + +2. Incomplete applications are only updated intermittently. The time between updates is defined +by the interval between checks for changed files (`spark.history.fs.update.interval`). +On larger clusters the update interval may be set to large values. +The way to view a running application is actually to view its own web UI. + +3. Applications which exited without registering themselves as completed will be listed +as incomplete —even though they are no longer running. This can happen if an application +crashes. + +2. One way to signal the completion of a Spark job is to stop the Spark Context +explicitly (`sc.stop()`), or in Python using the `with SparkContext() as sc:` construct +to handle the Spark Context setup and tear down. + ## REST API @@ -249,7 +289,7 @@ These endpoints have been strongly versioned to make it easier to develop applic * New endpoints may be added * New fields may be added to existing endpoints * New versions of the api may be added in the future at a separate endpoint (eg., `api/v2`). New versions are *not* required to be backwards compatible. -* Api versions may be dropped, but only after at least one minor release of co-existing with a new api version +* Api versions may be dropped, but only after at least one minor release of co-existing with a new api version. Note that even when examining the UI of a running applications, the `applications/[app-id]` portion is still required, though there is only one application available. Eg. to see the list of jobs for the diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 133894704b..8611106db0 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -233,6 +233,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metadataCleaner"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint") + ) ++ Seq( + // SPARK-7889 + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.org$apache$spark$deploy$history$HistoryServer$@tachSparkUI") ) case v if v.startsWith("1.6") => Seq( -- cgit v1.2.3