aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala665
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala149
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala78
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala488
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala224
-rw-r--r--docs/monitoring.md70
-rw-r--r--project/MimaExcludes.scala3
10 files changed, 1654 insertions, 74 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
new file mode 100644
index 0000000000..e2fda29044
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.NoSuchElementException
+import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.codahale.metrics.{Counter, MetricRegistry, Timer}
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
+import org.eclipse.jetty.servlet.FilterHolder
+
+import org.apache.spark.Logging
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Clock
+
+/**
+ * Cache for applications.
+ *
+ * Completed applications are cached for as long as there is capacity for them.
+ * Incompleted applications have their update time checked on every
+ * retrieval; if the cached entry is out of date, it is refreshed.
+ *
+ * @note there must be only one instance of [[ApplicationCache]] in a
+ * JVM at a time. This is because a static field in [[ApplicationCacheCheckFilterRelay]]
+ * keeps a reference to the cache so that HTTP requests on the attempt-specific web UIs
+ * can probe the current cache to see if the attempts have changed.
+ *
+ * Creating multiple instances will break this routing.
+ * @param operations implementation of record access operations
+ * @param retainedApplications number of retained applications
+ * @param clock time source
+ */
+private[history] class ApplicationCache(
+ val operations: ApplicationCacheOperations,
+ val retainedApplications: Int,
+ val clock: Clock) extends Logging {
+
+ /**
+ * Services the load request from the cache.
+ */
+ private val appLoader = new CacheLoader[CacheKey, CacheEntry] {
+
+ /** the cache key doesn't match a cached entry, or the entry is out-of-date, so load it. */
+ override def load(key: CacheKey): CacheEntry = {
+ loadApplicationEntry(key.appId, key.attemptId)
+ }
+
+ }
+
+ /**
+ * Handler for callbacks from the cache of entry removal.
+ */
+ private val removalListener = new RemovalListener[CacheKey, CacheEntry] {
+
+ /**
+ * Removal event notifies the provider to detach the UI.
+ * @param rm removal notification
+ */
+ override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = {
+ metrics.evictionCount.inc()
+ val key = rm.getKey
+ logDebug(s"Evicting entry ${key}")
+ operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().ui)
+ }
+ }
+
+ /**
+ * The cache of applications.
+ *
+ * Tagged as `protected` so as to allow subclasses in tests to accesss it directly
+ */
+ protected val appCache: LoadingCache[CacheKey, CacheEntry] = {
+ CacheBuilder.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(removalListener)
+ .build(appLoader)
+ }
+
+ /**
+ * The metrics which are updated as the cache is used.
+ */
+ val metrics = new CacheMetrics("history.cache")
+
+ init()
+
+ /**
+ * Perform any startup operations.
+ *
+ * This includes declaring this instance as the cache to use in the
+ * [[ApplicationCacheCheckFilterRelay]].
+ */
+ private def init(): Unit = {
+ ApplicationCacheCheckFilterRelay.setApplicationCache(this)
+ }
+
+ /**
+ * Stop the cache.
+ * This will reset the relay in [[ApplicationCacheCheckFilterRelay]].
+ */
+ def stop(): Unit = {
+ ApplicationCacheCheckFilterRelay.resetApplicationCache()
+ }
+
+ /**
+ * Get an entry.
+ *
+ * Cache fetch/refresh will have taken place by the time this method returns.
+ * @param appAndAttempt application to look up in the format needed by the history server web UI,
+ * `appId/attemptId` or `appId`.
+ * @return the entry
+ */
+ def get(appAndAttempt: String): SparkUI = {
+ val parts = splitAppAndAttemptKey(appAndAttempt)
+ get(parts._1, parts._2)
+ }
+
+ /**
+ * Get the Spark UI, converting a lookup failure from an exception to `None`.
+ * @param appAndAttempt application to look up in the format needed by the history server web UI,
+ * `appId/attemptId` or `appId`.
+ * @return the entry
+ */
+ def getSparkUI(appAndAttempt: String): Option[SparkUI] = {
+ try {
+ val ui = get(appAndAttempt)
+ Some(ui)
+ } catch {
+ case NonFatal(e) => e.getCause() match {
+ case nsee: NoSuchElementException =>
+ None
+ case cause: Exception => throw cause
+ }
+ }
+ }
+
+ /**
+ * Get the associated spark UI.
+ *
+ * Cache fetch/refresh will have taken place by the time this method returns.
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @return the entry
+ */
+ def get(appId: String, attemptId: Option[String]): SparkUI = {
+ lookupAndUpdate(appId, attemptId)._1.ui
+ }
+
+ /**
+ * Look up the entry; update it if needed.
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @return the underlying cache entry -which can have its timestamp changed, and a flag to
+ * indicate that the entry has changed
+ */
+ private def lookupAndUpdate(appId: String, attemptId: Option[String]): (CacheEntry, Boolean) = {
+ metrics.lookupCount.inc()
+ val cacheKey = CacheKey(appId, attemptId)
+ var entry = appCache.getIfPresent(cacheKey)
+ var updated = false
+ if (entry == null) {
+ // no entry, so fetch without any post-fetch probes for out-of-dateness
+ // this will trigger a callback to loadApplicationEntry()
+ entry = appCache.get(cacheKey)
+ } else if (!entry.completed) {
+ val now = clock.getTimeMillis()
+ log.debug(s"Probing at time $now for updated application $cacheKey -> $entry")
+ metrics.updateProbeCount.inc()
+ updated = time(metrics.updateProbeTimer) {
+ entry.updateProbe()
+ }
+ if (updated) {
+ logDebug(s"refreshing $cacheKey")
+ metrics.updateTriggeredCount.inc()
+ appCache.refresh(cacheKey)
+ // and repeat the lookup
+ entry = appCache.get(cacheKey)
+ } else {
+ // update the probe timestamp to the current time
+ entry.probeTime = now
+ }
+ }
+ (entry, updated)
+ }
+
+ /**
+ * This method is visible for testing.
+ *
+ * It looks up the cached entry *and returns a clone of it*.
+ * This ensures that the cached entries never leak
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @return a new entry with shared SparkUI, but copies of the other fields.
+ */
+ def lookupCacheEntry(appId: String, attemptId: Option[String]): CacheEntry = {
+ val entry = lookupAndUpdate(appId, attemptId)._1
+ new CacheEntry(entry.ui, entry.completed, entry.updateProbe, entry.probeTime)
+ }
+
+ /**
+ * Probe for an application being updated.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return true if an update has been triggered
+ */
+ def checkForUpdates(appId: String, attemptId: Option[String]): Boolean = {
+ val (entry, updated) = lookupAndUpdate(appId, attemptId)
+ updated
+ }
+
+ /**
+ * Size probe, primarily for testing.
+ * @return size
+ */
+ def size(): Long = appCache.size()
+
+ /**
+ * Emptiness predicate, primarily for testing.
+ * @return true if the cache is empty
+ */
+ def isEmpty: Boolean = appCache.size() == 0
+
+ /**
+ * Time a closure, returning its output.
+ * @param t timer
+ * @param f function
+ * @tparam T type of return value of time
+ * @return the result of the function.
+ */
+ private def time[T](t: Timer)(f: => T): T = {
+ val timeCtx = t.time()
+ try {
+ f
+ } finally {
+ timeCtx.close()
+ }
+ }
+
+ /**
+ * Load the Spark UI via [[ApplicationCacheOperations.getAppUI()]],
+ * then attach it to the web UI via [[ApplicationCacheOperations.attachSparkUI()]].
+ *
+ * If the application is incomplete, it has the [[ApplicationCacheCheckFilter]]
+ * added as a filter to the HTTP requests, so that queries on the UI will trigger
+ * update checks.
+ *
+ * The generated entry contains the UI and the current timestamp.
+ * The timer [[metrics.loadTimer]] tracks the time taken to load the UI.
+ *
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @return the cache entry
+ * @throws NoSuchElementException if there is no matching element
+ */
+ @throws[NoSuchElementException]
+ def loadApplicationEntry(appId: String, attemptId: Option[String]): CacheEntry = {
+
+ logDebug(s"Loading application Entry $appId/$attemptId")
+ metrics.loadCount.inc()
+ time(metrics.loadTimer) {
+ operations.getAppUI(appId, attemptId) match {
+ case Some(LoadedAppUI(ui, updateState)) =>
+ val completed = ui.getApplicationInfoList.exists(_.attempts.last.completed)
+ if (completed) {
+ // completed spark UIs are attached directly
+ operations.attachSparkUI(appId, attemptId, ui, completed)
+ } else {
+ // incomplete UIs have the cache-check filter put in front of them.
+ ApplicationCacheCheckFilterRelay.registerFilter(ui, appId, attemptId)
+ operations.attachSparkUI(appId, attemptId, ui, completed)
+ }
+ // build the cache entry
+ val now = clock.getTimeMillis()
+ val entry = new CacheEntry(ui, completed, updateState, now)
+ logDebug(s"Loaded application $appId/$attemptId -> $entry")
+ entry
+ case None =>
+ metrics.lookupFailureCount.inc()
+ // guava's cache logs via java.util log, so is of limited use. Hence: our own message
+ logInfo(s"Failed to load application attempt $appId/$attemptId")
+ throw new NoSuchElementException(s"no application with application Id '$appId'" +
+ attemptId.map { id => s" attemptId '$id'" }.getOrElse(" and no attempt Id"))
+ }
+ }
+ }
+
+ /**
+ * Split up an `applicationId/attemptId` or `applicationId` key into the separate pieces.
+ *
+ * @param appAndAttempt combined key
+ * @return a tuple of the application ID and, if present, the attemptID
+ */
+ def splitAppAndAttemptKey(appAndAttempt: String): (String, Option[String]) = {
+ val parts = appAndAttempt.split("/")
+ require(parts.length == 1 || parts.length == 2, s"Invalid app key $appAndAttempt")
+ val appId = parts(0)
+ val attemptId = if (parts.length > 1) Some(parts(1)) else None
+ (appId, attemptId)
+ }
+
+ /**
+ * Merge an appId and optional attempt Id into a key of the form `applicationId/attemptId`.
+ *
+ * If there is an `attemptId`; `applicationId` if not.
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @return a unified string
+ */
+ def mergeAppAndAttemptToKey(appId: String, attemptId: Option[String]): String = {
+ appId + attemptId.map { id => s"/$id" }.getOrElse("")
+ }
+
+ /**
+ * String operator dumps the cache entries and metrics.
+ * @return a string value, primarily for testing and diagnostics
+ */
+ override def toString: String = {
+ val sb = new StringBuilder(s"ApplicationCache(" +
+ s" retainedApplications= $retainedApplications)")
+ sb.append(s"; time= ${clock.getTimeMillis()}")
+ sb.append(s"; entry count= ${appCache.size()}\n")
+ sb.append("----\n")
+ appCache.asMap().asScala.foreach {
+ case(key, entry) => sb.append(s" $key -> $entry\n")
+ }
+ sb.append("----\n")
+ sb.append(metrics)
+ sb.append("----\n")
+ sb.toString()
+ }
+}
+
+/**
+ * An entry in the cache.
+ *
+ * @param ui Spark UI
+ * @param completed Flag to indicated that the application has completed (and so
+ * does not need refreshing).
+ * @param updateProbe function to call to see if the application has been updated and
+ * therefore that the cached value needs to be refreshed.
+ * @param probeTime Times in milliseconds when the probe was last executed.
+ */
+private[history] final class CacheEntry(
+ val ui: SparkUI,
+ val completed: Boolean,
+ val updateProbe: () => Boolean,
+ var probeTime: Long) {
+
+ /** string value is for test assertions */
+ override def toString: String = {
+ s"UI $ui, completed=$completed, probeTime=$probeTime"
+ }
+}
+
+/**
+ * Cache key: compares on `appId` and then, if non-empty, `attemptId`.
+ * The [[hashCode()]] function uses the same fields.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ */
+private[history] final case class CacheKey(appId: String, attemptId: Option[String]) {
+
+ override def toString: String = {
+ appId + attemptId.map { id => s"/$id" }.getOrElse("")
+ }
+}
+
+/**
+ * Metrics of the cache
+ * @param prefix prefix to register all entries under
+ */
+private[history] class CacheMetrics(prefix: String) extends Source {
+
+ /* metrics: counters and timers */
+ val lookupCount = new Counter()
+ val lookupFailureCount = new Counter()
+ val evictionCount = new Counter()
+ val loadCount = new Counter()
+ val loadTimer = new Timer()
+ val updateProbeCount = new Counter()
+ val updateProbeTimer = new Timer()
+ val updateTriggeredCount = new Counter()
+
+ /** all the counters: for registration and string conversion. */
+ private val counters = Seq(
+ ("lookup.count", lookupCount),
+ ("lookup.failure.count", lookupFailureCount),
+ ("eviction.count", evictionCount),
+ ("load.count", loadCount),
+ ("update.probe.count", updateProbeCount),
+ ("update.triggered.count", updateTriggeredCount))
+
+ /** all metrics, including timers */
+ private val allMetrics = counters ++ Seq(
+ ("load.timer", loadTimer),
+ ("update.probe.timer", updateProbeTimer))
+
+ /**
+ * Name of metric source
+ */
+ override val sourceName = "ApplicationCache"
+
+ override val metricRegistry: MetricRegistry = new MetricRegistry
+
+ /**
+ * Startup actions.
+ * This includes registering metrics with [[metricRegistry]]
+ */
+ private def init(): Unit = {
+ allMetrics.foreach { case (name, metric) =>
+ metricRegistry.register(MetricRegistry.name(prefix, name), metric)
+ }
+ }
+
+ override def toString: String = {
+ val sb = new StringBuilder()
+ counters.foreach { case (name, counter) =>
+ sb.append(name).append(" = ").append(counter.getCount).append('\n')
+ }
+ sb.toString()
+ }
+}
+
+/**
+ * API for cache events. That is: loading an App UI; and for
+ * attaching/detaching the UI to and from the Web UI.
+ */
+private[history] trait ApplicationCacheOperations {
+
+ /**
+ * Get the application UI and the probe neededed to see if it has been updated.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be used in the cache
+ */
+ def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI]
+
+ /**
+ * Attach a reconstructed UI.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @param ui UI
+ * @param completed flag to indicate that the UI has completed
+ */
+ def attachSparkUI(
+ appId: String,
+ attemptId: Option[String],
+ ui: SparkUI,
+ completed: Boolean): Unit
+
+ /**
+ * Detach a Spark UI.
+ *
+ * @param ui Spark UI
+ */
+ def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit
+
+}
+
+/**
+ * This is a servlet filter which intercepts HTTP requests on application UIs and
+ * triggers checks for updated data.
+ *
+ * If the application cache indicates that the application has been updated,
+ * the filter returns a 302 redirect to the caller, asking them to re-request the web
+ * page.
+ *
+ * Because the application cache will detach and then re-attach the UI, when the caller
+ * repeats that request, it will now pick up the newly-updated web application.
+ *
+ * This does require the caller to handle 302 requests. Because of the ambiguity
+ * in how POST and PUT operations are responded to (that is, should a 307 be
+ * processed directly), the filter <i>does not</i> filter those requests.
+ * As the current web UIs are read-only, this is not an issue. If it were ever to
+ * support more HTTP verbs, then some support may be required. Perhaps, rather
+ * than sending a redirect, simply updating the value so that the <i>next</i>
+ * request will pick it up.
+ *
+ * Implementation note: there's some abuse of a shared global entry here because
+ * the configuration data passed to the servlet is just a string:string map.
+ */
+private[history] class ApplicationCacheCheckFilter() extends Filter with Logging {
+
+ import ApplicationCacheCheckFilterRelay._
+ var appId: String = _
+ var attemptId: Option[String] = _
+
+ /**
+ * Bind the app and attempt ID, throwing an exception if no application ID was provided.
+ * @param filterConfig configuration
+ */
+ override def init(filterConfig: FilterConfig): Unit = {
+
+ appId = Option(filterConfig.getInitParameter(APP_ID))
+ .getOrElse(throw new ServletException(s"Missing Parameter $APP_ID"))
+ attemptId = Option(filterConfig.getInitParameter(ATTEMPT_ID))
+ logDebug(s"initializing filter $this")
+ }
+
+ /**
+ * Filter the request.
+ * Either the caller is given a 302 redirect to the current URL, or the
+ * request is passed on to the SparkUI servlets.
+ *
+ * @param request HttpServletRequest
+ * @param response HttpServletResponse
+ * @param chain the rest of the request chain
+ */
+ override def doFilter(
+ request: ServletRequest,
+ response: ServletResponse,
+ chain: FilterChain): Unit = {
+
+ // nobody has ever implemented any other kind of servlet, yet
+ // this check is universal, just in case someone does exactly
+ // that on your classpath
+ if (!(request.isInstanceOf[HttpServletRequest])) {
+ throw new ServletException("This filter only works for HTTP/HTTPS")
+ }
+ val httpRequest = request.asInstanceOf[HttpServletRequest]
+ val httpResponse = response.asInstanceOf[HttpServletResponse]
+ val requestURI = httpRequest.getRequestURI
+ val operation = httpRequest.getMethod
+
+ // if the request is for an attempt, check to see if it is in need of delete/refresh
+ // and have the cache update the UI if so
+ if (operation=="HEAD" || operation=="GET"
+ && checkForUpdates(requestURI, appId, attemptId)) {
+ // send a redirect back to the same location. This will be routed
+ // to the *new* UI
+ logInfo(s"Application Attempt $appId/$attemptId updated; refreshing")
+ val queryStr = Option(httpRequest.getQueryString).map("?" + _).getOrElse("")
+ val redirectUrl = httpResponse.encodeRedirectURL(requestURI + queryStr)
+ httpResponse.sendRedirect(redirectUrl)
+ } else {
+ chain.doFilter(request, response)
+ }
+ }
+
+ override def destroy(): Unit = {
+ }
+
+ override def toString: String = s"ApplicationCacheCheckFilter for $appId/$attemptId"
+}
+
+/**
+ * Global state for the [[ApplicationCacheCheckFilter]] instances, so that they can relay cache
+ * probes to the cache.
+ *
+ * This is an ugly workaround for the limitation of servlets and filters in the Java servlet
+ * API; they are still configured on the model of a list of classnames and configuration
+ * strings in a `web.xml` field, rather than a chain of instances wired up by hand or
+ * via an injection framework. There is no way to directly configure a servlet filter instance
+ * with a reference to the application cache which is must use: some global state is needed.
+ *
+ * Here, [[ApplicationCacheCheckFilter]] is that global state; it relays all requests
+ * to the singleton [[ApplicationCache]]
+ *
+ * The field `applicationCache` must be set for the filters to work -
+ * this is done during the construction of [[ApplicationCache]], which requires that there
+ * is only one cache serving requests through the WebUI.
+ *
+ * *Important* In test runs, if there is more than one [[ApplicationCache]], the relay logic
+ * will break: filters may not find instances. Tests must not do that.
+ *
+ */
+private[history] object ApplicationCacheCheckFilterRelay extends Logging {
+ // name of the app ID entry in the filter configuration. Mandatory.
+ val APP_ID = "appId"
+
+ // name of the attempt ID entry in the filter configuration. Optional.
+ val ATTEMPT_ID = "attemptId"
+
+ // namer of the filter to register
+ val FILTER_NAME = "org.apache.spark.deploy.history.ApplicationCacheCheckFilter"
+
+ /** the application cache to relay requests to */
+ @volatile
+ private var applicationCache: Option[ApplicationCache] = None
+
+ /**
+ * Set the application cache. Logs a warning if it is overwriting an existing value
+ * @param cache new cache
+ */
+ def setApplicationCache(cache: ApplicationCache): Unit = {
+ applicationCache.foreach( c => logWarning(s"Overwriting application cache $c"))
+ applicationCache = Some(cache)
+ }
+
+ /**
+ * Reset the application cache
+ */
+ def resetApplicationCache(): Unit = {
+ applicationCache = None
+ }
+
+ /**
+ * Check to see if there has been an update
+ * @param requestURI URI the request came in on
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return true if an update was loaded for the app/attempt
+ */
+ def checkForUpdates(requestURI: String, appId: String, attemptId: Option[String]): Boolean = {
+
+ logDebug(s"Checking $appId/$attemptId from $requestURI")
+ applicationCache match {
+ case Some(cache) =>
+ try {
+ cache.checkForUpdates(appId, attemptId)
+ } catch {
+ case ex: Exception =>
+ // something went wrong. Keep going with the existing UI
+ logWarning(s"When checking for $appId/$attemptId from $requestURI", ex)
+ false
+ }
+
+ case None =>
+ logWarning("No application cache instance defined")
+ false
+ }
+ }
+
+
+ /**
+ * Register a filter for the web UI which checks for updates to the given app/attempt
+ * @param ui Spark UI to attach filters to
+ * @param appId application ID
+ * @param attemptId attempt ID
+ */
+ def registerFilter(
+ ui: SparkUI,
+ appId: String,
+ attemptId: Option[String] ): Unit = {
+ require(ui != null)
+ val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST)
+ val holder = new FilterHolder()
+ holder.setClassName(FILTER_NAME)
+ holder.setInitParameter(APP_ID, appId)
+ attemptId.foreach( id => holder.setInitParameter(ATTEMPT_ID, id))
+ require(ui.getHandlers != null, "null handlers")
+ ui.getHandlers.foreach { handler =>
+ handler.addFilter(holder, "/*", enumDispatcher)
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 5f5e0fe1c3..44661edfff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -33,7 +33,42 @@ private[spark] case class ApplicationAttemptInfo(
private[spark] case class ApplicationHistoryInfo(
id: String,
name: String,
- attempts: List[ApplicationAttemptInfo])
+ attempts: List[ApplicationAttemptInfo]) {
+
+ /**
+ * Has this application completed?
+ * @return true if the most recent attempt has completed
+ */
+ def completed: Boolean = {
+ attempts.nonEmpty && attempts.head.completed
+ }
+}
+
+/**
+ * A probe which can be invoked to see if a loaded Web UI has been updated.
+ * The probe is expected to be relative purely to that of the UI returned
+ * in the same [[LoadedAppUI]] instance. That is, whenever a new UI is loaded,
+ * the probe returned with it is the one that must be used to check for it
+ * being out of date; previous probes must be discarded.
+ */
+private[history] abstract class HistoryUpdateProbe {
+ /**
+ * Return true if the history provider has a later version of the application
+ * attempt than the one against this probe was constructed.
+ * @return
+ */
+ def isUpdated(): Boolean
+}
+
+/**
+ * All the information returned from a call to `getAppUI()`: the new UI
+ * and any required update state.
+ * @param ui Spark UI
+ * @param updateProbe probe to call to check on the update state of this application attempt
+ */
+private[history] case class LoadedAppUI(
+ ui: SparkUI,
+ updateProbe: () => Boolean)
private[history] abstract class ApplicationHistoryProvider {
@@ -49,9 +84,10 @@ private[history] abstract class ApplicationHistoryProvider {
*
* @param appId The application ID.
* @param attemptId The application attempt ID (or None if there is no attempt ID).
- * @return The application's UI, or None if application is not found.
+ * @return a [[LoadedAppUI]] instance containing the application's UI and any state information
+ * for update probes, or `None` if the application/attempt is not found.
*/
- def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
+ def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI]
/**
* Called when the server is shutting down.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9648959dba..f885798760 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.history
-import java.io.{BufferedInputStream, FileNotFoundException, InputStream, IOException, OutputStream}
+import java.io.{FileNotFoundException, IOException, OutputStream}
import java.util.UUID
import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
@@ -33,7 +33,6 @@ import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -42,6 +41,31 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
+ *
+ * == How new and updated attempts are detected ==
+ *
+ * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any
+ * entries in the log dir whose modification time is greater than the last scan time
+ * are considered new or updated. These are replayed to create a new [[FsApplicationAttemptInfo]]
+ * entry and update or create a matching [[FsApplicationHistoryInfo]] element in the list
+ * of applications.
+ * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the
+ * [[FsApplicationAttemptInfo]] is replaced by another one with a larger log size.
+ * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]]
+ * instance is out of date, the log size of the cached instance is checked against the app last
+ * loaded by [[checkForLogs]].
+ *
+ * The use of log size, rather than simply relying on modification times, is needed to
+ * address the following issues
+ * - some filesystems do not appear to update the `modtime` value whenever data is flushed to
+ * an open file output stream. Changes to the history may not be picked up.
+ * - the granularity of the `modtime` field may be 2+ seconds. Rapid changes to the FS can be
+ * missed.
+ *
+ * Tracking filesize works given the following invariant: the logs get bigger
+ * as new events are added. If a format was used in which this did not hold, the mechanism would
+ * break. Simple streaming of JSON-formatted events, as is implemented today, implicitly
+ * maintains this invariant.
*/
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
extends ApplicationHistoryProvider with Logging {
@@ -77,9 +101,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("spark-history-task-%d").setDaemon(true).build())
- // The modification time of the newest log detected during the last scan. This is used
- // to ignore logs that are older during subsequent scans, to avoid processing data that
- // is already known.
+ // The modification time of the newest log detected during the last scan. Currently only
+ // used for logging msgs (logs are re-scanned based on file size, rather than modtime)
private var lastScanTime = -1L
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
@@ -87,6 +110,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()
+ val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()
+
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
@@ -176,18 +201,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
+ logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
+ } else {
+ logDebug("Background update thread disabled for testing")
}
}
override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
- override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
try {
applications.get(appId).flatMap { appInfo =>
appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
@@ -210,7 +238,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
- ui
+ LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
}
}
}
@@ -243,12 +271,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] def checkForLogs(): Unit = {
try {
val newLastScanTime = getNewLastScanTime()
+ logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
+ // scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
try {
- !entry.isDirectory() && (entry.getModificationTime() >= lastScanTime)
+ val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+ !entry.isDirectory() && prevFileSize < entry.getLen()
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -262,6 +293,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
entry1.getModificationTime() >= entry2.getModificationTime()
}
+ if (logInfos.nonEmpty) {
+ logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
+ }
logInfos.grouped(20)
.map { batch =>
replayExecutor.submit(new Runnable {
@@ -356,7 +390,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val bus = new ReplayListenerBus()
val res = replay(fileStatus, bus)
res match {
- case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
+ case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
@@ -511,6 +545,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
+ // Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
+ // and when we read the file here. That is OK -- it may result in an unnecessary refresh
+ // when there is no update, but will not result in missing an update. We *must* prevent
+ // an error the other way -- if we report a size bigger (ie later) than the file that is
+ // actually read, we may never refresh the app. FileStatus is guaranteed to be static
+ // after it's created, so we get a file size that is no bigger than what is actually read.
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
@@ -521,7 +561,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
// try to show their UI.
if (appListener.appId.isDefined) {
- Some(new FsApplicationAttemptInfo(
+ val attemptInfo = new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
@@ -530,7 +570,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
appListener.endTime.getOrElse(-1L),
eventLog.getModificationTime(),
appListener.sparkUser.getOrElse(NOT_STARTED),
- appCompleted))
+ appCompleted,
+ eventLog.getLen()
+ )
+ fileToAppInfo(logPath) = attemptInfo
+ Some(attemptInfo)
} else {
None
}
@@ -564,12 +608,77 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
}
+ /**
+ * String description for diagnostics
+ * @return a summary of the component state
+ */
+ override def toString: String = {
+ val header = s"""
+ | FsHistoryProvider: logdir=$logDir,
+ | last scan time=$lastScanTime
+ | Cached application count =${applications.size}}
+ """.stripMargin
+ val sb = new StringBuilder(header)
+ applications.foreach(entry => sb.append(entry._2).append("\n"))
+ sb.toString
+ }
+
+ /**
+ * Look up an application attempt
+ * @param appId application ID
+ * @param attemptId Attempt ID, if set
+ * @return the matching attempt, if found
+ */
+ def lookup(appId: String, attemptId: Option[String]): Option[FsApplicationAttemptInfo] = {
+ applications.get(appId).flatMap { appInfo =>
+ appInfo.attempts.find(_.attemptId == attemptId)
+ }
+ }
+
+ /**
+ * Return true iff a newer version of the UI is available. The check is based on whether the
+ * fileSize for the currently loaded UI is smaller than the file size the last time
+ * the logs were loaded.
+ *
+ * This is a very cheap operation -- the work of loading the new attempt was already done
+ * by [[checkForLogs]].
+ * @param appId application to probe
+ * @param attemptId attempt to probe
+ * @param prevFileSize the file size of the logs for the currently displayed UI
+ */
+ private def updateProbe(
+ appId: String,
+ attemptId: Option[String],
+ prevFileSize: Long)(): Boolean = {
+ lookup(appId, attemptId) match {
+ case None =>
+ logDebug(s"Application Attempt $appId/$attemptId not found")
+ false
+ case Some(latest) =>
+ prevFileSize < latest.fileSize
+ }
+ }
}
private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}
+/**
+ * Application attempt information.
+ *
+ * @param logPath path to the log file, or, for a legacy log, its directory
+ * @param name application name
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @param startTime start time (from playback)
+ * @param endTime end time (from playback). -1 if the application is incomplete.
+ * @param lastUpdated the modification time of the log file when this entry was built by replaying
+ * the history.
+ * @param sparkUser user running the application
+ * @param completed flag to indicate whether or not the application has completed.
+ * @param fileSize the size of the log file the last time the file was scanned for changes
+ */
private class FsApplicationAttemptInfo(
val logPath: String,
val name: String,
@@ -579,10 +688,24 @@ private class FsApplicationAttemptInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
- completed: Boolean = true)
+ completed: Boolean,
+ val fileSize: Long)
extends ApplicationAttemptInfo(
- attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
+ attemptId, startTime, endTime, lastUpdated, sparkUser, completed) {
+ /** extend the superclass string value with the extra attributes of this class */
+ override def toString: String = {
+ s"FsApplicationAttemptInfo($name, $appId," +
+ s" ${super.toString}, source=$logPath, size=$fileSize"
+ }
+}
+
+/**
+ * Application history information
+ * @param id application ID
+ * @param name application name
+ * @param attempts list of attempts, most recent first.
+ */
private class FsApplicationHistoryInfo(
id: String,
override val name: String,
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index cab7faefe8..2fad1120cd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -30,7 +30,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
val allApps = parent.getApplicationList()
- .filter(_.attempts.head.completed != requestedIncomplete)
+ .filter(_.completed != requestedIncomplete)
val allAppsSize = allApps.size
val providerConfig = parent.getProviderConfig()
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 1f13d7db34..076bdc5c05 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -23,7 +23,6 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import scala.util.control.NonFatal
-import com.google.common.cache._
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -31,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{ShutdownHookManager, Utils}
+import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils}
/**
* A web server that renders SparkUIs of completed applications.
@@ -50,31 +49,16 @@ class HistoryServer(
securityManager: SecurityManager,
port: Int)
extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf)
- with Logging with UIRoot {
+ with Logging with UIRoot with ApplicationCacheOperations {
// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
- private val appLoader = new CacheLoader[String, SparkUI] {
- override def load(key: String): SparkUI = {
- val parts = key.split("/")
- require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
- val ui = provider
- .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
- .getOrElse(throw new NoSuchElementException(s"no app with key $key"))
- attachSparkUI(ui)
- ui
- }
- }
+ // application
+ private val appCache = new ApplicationCache(this, retainedApplications, new SystemClock())
- private val appCache = CacheBuilder.newBuilder()
- .maximumSize(retainedApplications)
- .removalListener(new RemovalListener[String, SparkUI] {
- override def onRemoval(rm: RemovalNotification[String, SparkUI]): Unit = {
- detachSparkUI(rm.getValue())
- }
- })
- .build(appLoader)
+ // and its metrics, for testing as well as monitoring
+ val cacheMetrics = appCache.metrics
private val loaderServlet = new HttpServlet {
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
@@ -117,17 +101,7 @@ class HistoryServer(
}
def getSparkUI(appKey: String): Option[SparkUI] = {
- try {
- val ui = appCache.get(appKey)
- Some(ui)
- } catch {
- case NonFatal(e) => e.getCause() match {
- case nsee: NoSuchElementException =>
- None
-
- case cause: Exception => throw cause
- }
- }
+ appCache.getSparkUI(appKey)
}
initialize()
@@ -160,22 +134,37 @@ class HistoryServer(
override def stop() {
super.stop()
provider.stop()
+ appCache.stop()
}
/** Attach a reconstructed UI to this server. Only valid after bind(). */
- private def attachSparkUI(ui: SparkUI) {
+ override def attachSparkUI(
+ appId: String,
+ attemptId: Option[String],
+ ui: SparkUI,
+ completed: Boolean) {
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
addFilters(ui.getHandlers, conf)
}
/** Detach a reconstructed UI from this server. Only valid after bind(). */
- private def detachSparkUI(ui: SparkUI) {
+ override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = {
assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}
/**
+ * Get the application UI and whether or not it is completed
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @return If found, the Spark UI and any history information to be used in the cache
+ */
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
+ provider.getAppUI(appId, attemptId)
+ }
+
+ /**
* Returns a list of available applications, in descending order according to their end time.
*
* @return List of all known applications.
@@ -202,9 +191,15 @@ class HistoryServer(
*/
def getProviderConfig(): Map[String, String] = provider.getConfig()
+ /**
+ * Load an application UI and attach it to the web server.
+ * @param appId application ID
+ * @param attemptId optional attempt ID
+ * @return true if the application was found and loaded.
+ */
private def loadAppUi(appId: String, attemptId: Option[String]): Boolean = {
try {
- appCache.get(appId + attemptId.map { id => s"/$id" }.getOrElse(""))
+ appCache.get(appId, attemptId)
true
} catch {
case NonFatal(e) => e.getCause() match {
@@ -216,6 +211,17 @@ class HistoryServer(
}
}
+ /**
+ * String value for diagnostics.
+ * @return a multi-line description of the server state.
+ */
+ override def toString: String = {
+ s"""
+ | History Server;
+ | provider = $provider
+ | cache = $appCache
+ """.stripMargin
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 01fee46e73..8354e2a611 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -224,6 +224,13 @@ private[spark] class EventLoggingListener(
}
}
fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
+ // touch file to ensure modtime is current across those filesystems where rename()
+ // does not set it, -and which support setTimes(); it's a no-op on most object stores
+ try {
+ fileSystem.setTimes(target, System.currentTimeMillis(), -1)
+ } catch {
+ case e: Exception => logDebug(s"failed to set time of $target", e)
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
new file mode 100644
index 0000000000..de6680c610
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+import javax.servlet.Filter
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers {
+
+ /**
+ * subclass with access to the cache internals
+ * @param retainedApplications number of retained applications
+ */
+ class TestApplicationCache(
+ operations: ApplicationCacheOperations = new StubCacheOperations(),
+ retainedApplications: Int,
+ clock: Clock = new ManualClock(0))
+ extends ApplicationCache(operations, retainedApplications, clock) {
+
+ def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+ }
+
+ /**
+ * Stub cache operations.
+ * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+ * the `probeTime` field in the cache entry setting the timestamp of the entry
+ */
+ class StubCacheOperations extends ApplicationCacheOperations with Logging {
+
+ /** map to UI instances, including timestamps, which are used in update probes */
+ val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+ /** Map of attached spark UIs */
+ val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+ var getAppUICount = 0L
+ var attachCount = 0L
+ var detachCount = 0L
+ var updateProbeCount = 0L
+
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
+ logDebug(s"getAppUI($appId, $attemptId)")
+ getAppUICount += 1
+ instances.get(CacheKey(appId, attemptId)).map( e =>
+ LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime)))
+ }
+
+ override def attachSparkUI(
+ appId: String,
+ attemptId: Option[String],
+ ui: SparkUI,
+ completed: Boolean): Unit = {
+ logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+ attachCount += 1
+ attached += (CacheKey(appId, attemptId) -> ui)
+ }
+
+ def putAndAttach(
+ appId: String,
+ attemptId: Option[String],
+ completed: Boolean,
+ started: Long,
+ ended: Long,
+ timestamp: Long): SparkUI = {
+ val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp)
+ attachSparkUI(appId, attemptId, ui, completed)
+ ui
+ }
+
+ def putAppUI(
+ appId: String,
+ attemptId: Option[String],
+ completed: Boolean,
+ started: Long,
+ ended: Long,
+ timestamp: Long): SparkUI = {
+ val ui = newUI(appId, attemptId, completed, started, ended)
+ putInstance(appId, attemptId, ui, completed, timestamp)
+ ui
+ }
+
+ def putInstance(
+ appId: String,
+ attemptId: Option[String],
+ ui: SparkUI,
+ completed: Boolean,
+ timestamp: Long): Unit = {
+ instances += (CacheKey(appId, attemptId) ->
+ new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp))
+ }
+
+ /**
+ * Detach a reconstructed UI
+ *
+ * @param ui Spark UI
+ */
+ override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = {
+ logDebug(s"detachSparkUI($appId, $attemptId, $ui)")
+ detachCount += 1
+ var name = ui.getAppName
+ val key = CacheKey(appId, attemptId)
+ attached.getOrElse(key, { throw new java.util.NoSuchElementException() })
+ attached -= key
+ }
+
+ /**
+ * Lookup from the internal cache of attached UIs
+ */
+ def getAttached(appId: String, attemptId: Option[String]): Option[SparkUI] = {
+ attached.get(CacheKey(appId, attemptId))
+ }
+
+ /**
+ * The update probe.
+ * @param appId application to probe
+ * @param attemptId attempt to probe
+ * @param updateTime timestamp of this UI load
+ */
+ private[history] def updateProbe(
+ appId: String,
+ attemptId: Option[String],
+ updateTime: Long)(): Boolean = {
+ updateProbeCount += 1
+ logDebug(s"isUpdated($appId, $attemptId, ${updateTime})")
+ val entry = instances.get(CacheKey(appId, attemptId)).get
+ val updated = entry.probeTime > updateTime
+ logDebug(s"entry = $entry; updated = $updated")
+ updated
+ }
+ }
+
+ /**
+ * Create a new UI. The info/attempt info classes here are from the package
+ * `org.apache.spark.status.api.v1`, not the near-equivalents from the history package
+ */
+ def newUI(
+ name: String,
+ attemptId: Option[String],
+ completed: Boolean,
+ started: Long,
+ ended: Long): SparkUI = {
+ val info = new ApplicationInfo(name, name, Some(1), Some(1), Some(1), Some(64),
+ Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended),
+ new Date(ended), ended - started, "user", completed)))
+ val ui = mock[SparkUI]
+ when(ui.getApplicationInfoList).thenReturn(List(info).iterator)
+ when(ui.getAppName).thenReturn(name)
+ when(ui.appName).thenReturn(name)
+ val handler = new ServletContextHandler()
+ when(ui.getHandlers).thenReturn(Seq(handler))
+ ui
+ }
+
+ /**
+ * Test operations on completed UIs: they are loaded on demand, entries
+ * are removed on overload.
+ *
+ * This effectively tests the original behavior of the history server's cache.
+ */
+ test("Completed UI get") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(1)
+ implicit val cache = new ApplicationCache(operations, 2, clock)
+ val metrics = cache.metrics
+ // cache misses
+ val app1 = "app-1"
+ assertNotFound(app1, None)
+ assertMetric("lookupCount", metrics.lookupCount, 1)
+ assertMetric("lookupFailureCount", metrics.lookupFailureCount, 1)
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assertNotFound(app1, None)
+ assert(2 === operations.getAppUICount, "getAppUICount")
+ assert(0 === operations.attachCount, "attachCount")
+
+ val now = clock.getTimeMillis()
+ // add the entry
+ operations.putAppUI(app1, None, true, now, now, now)
+
+ // make sure its local
+ operations.getAppUI(app1, None).get
+ operations.getAppUICount = 0
+ // now expect it to be found
+ val cacheEntry = cache.lookupCacheEntry(app1, None)
+ assert(1 === cacheEntry.probeTime)
+ assert(cacheEntry.completed)
+ // assert about queries made of the opereations
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assert(1 === operations.attachCount, "attachCount")
+
+ // and in the map of attached
+ assert(operations.getAttached(app1, None).isDefined, s"attached entry '1' from $cache")
+
+ // go forward in time
+ clock.setTime(10)
+ val time2 = clock.getTimeMillis()
+ val cacheEntry2 = cache.get(app1)
+ // no more refresh as this is a completed app
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assert(0 === operations.updateProbeCount, "updateProbeCount")
+ assert(0 === operations.detachCount, "attachCount")
+
+ // evict the entry
+ operations.putAndAttach("2", None, true, time2, time2, time2)
+ operations.putAndAttach("3", None, true, time2, time2, time2)
+ cache.get("2")
+ cache.get("3")
+
+ // there should have been a detachment here
+ assert(1 === operations.detachCount, s"detach count from $cache")
+ // and entry app1 no longer attached
+ assert(operations.getAttached(app1, None).isEmpty, s"get($app1) in $cache")
+ val appId = "app1"
+ val attemptId = Some("_01")
+ val time3 = clock.getTimeMillis()
+ operations.putAppUI(appId, attemptId, false, time3, 0, time3)
+ // expect an error here
+ assertNotFound(appId, None)
+ }
+
+ test("Test that if an attempt ID is is set, it must be used in lookups") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(1)
+ implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock)
+ val appId = "app1"
+ val attemptId = Some("_01")
+ operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0)
+ assertNotFound(appId, None)
+ }
+
+ /**
+ * Test that incomplete apps are not probed for updates during the time window,
+ * but that they are checked if that window has expired and they are not completed.
+ * Then, if they have changed, the old entry is replaced by a new one.
+ */
+ test("Incomplete apps refreshed") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(50)
+ val window = 500
+ implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock)
+ val metrics = cache.metrics
+ // add the incomplete app
+ // add the entry
+ val started = clock.getTimeMillis()
+ val appId = "app1"
+ val attemptId = Some("001")
+ operations.putAppUI(appId, attemptId, false, started, 0, started)
+ val firstEntry = cache.lookupCacheEntry(appId, attemptId)
+ assert(started === firstEntry.probeTime, s"timestamp in $firstEntry")
+ assert(!firstEntry.completed, s"entry is complete: $firstEntry")
+ assertMetric("lookupCount", metrics.lookupCount, 1)
+
+ assert(0 === operations.updateProbeCount, "expected no update probe on that first get")
+
+ val checkTime = window * 2
+ clock.setTime(checkTime)
+ val entry3 = cache.lookupCacheEntry(appId, attemptId)
+ assert(firstEntry !== entry3, s"updated entry test from $cache")
+ assertMetric("lookupCount", metrics.lookupCount, 2)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 1)
+ assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0)
+ assert(1 === operations.updateProbeCount, s"refresh count in $cache")
+ assert(0 === operations.detachCount, s"detach count")
+ assert(entry3.probeTime === checkTime)
+
+ val updateTime = window * 3
+ // update the cached value
+ val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime)
+ val endTime = window * 10
+ clock.setTime(endTime)
+ logDebug(s"Before operation = $cache")
+ val entry5 = cache.lookupCacheEntry(appId, attemptId)
+ assertMetric("lookupCount", metrics.lookupCount, 3)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+ // the update was triggered
+ assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1)
+ assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache")
+
+ // at which point, the refreshes stop
+ clock.setTime(window * 20)
+ assertCacheEntryEquals(appId, attemptId, entry5)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+ }
+
+ /**
+ * Assert that a metric counter has a specific value; failure raises an exception
+ * including the cache's toString value
+ * @param name counter name (for exceptions)
+ * @param counter counter
+ * @param expected expected value.
+ * @param cache cache
+ */
+ def assertMetric(
+ name: String,
+ counter: Counter,
+ expected: Long)
+ (implicit cache: ApplicationCache): Unit = {
+ val actual = counter.getCount
+ if (actual != expected) {
+ // this is here because Scalatest loses stack depth
+ throw new Exception(s"Wrong $name value - expected $expected but got $actual in $cache")
+ }
+ }
+
+ /**
+ * Look up the cache entry and assert that it maches in the expected value.
+ * This assertion works if the two CacheEntries are different -it looks at the fields.
+ * UI are compared on object equality; the timestamp and completed flags directly.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @param expected expected value
+ * @param cache app cache
+ */
+ def assertCacheEntryEquals(
+ appId: String,
+ attemptId: Option[String],
+ expected: CacheEntry)
+ (implicit cache: ApplicationCache): Unit = {
+ val actual = cache.lookupCacheEntry(appId, attemptId)
+ val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache"
+ assert(expected.ui === actual.ui, errorText + " SparkUI reference")
+ assert(expected.completed === actual.completed, errorText + " -completed flag")
+ assert(expected.probeTime === actual.probeTime, errorText + " -timestamp")
+ }
+
+ /**
+ * Assert that a key wasn't found in cache or loaded.
+ *
+ * Looks for the specific nested exception raised by [[ApplicationCache]]
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @param cache app cache
+ */
+ def assertNotFound(
+ appId: String,
+ attemptId: Option[String])
+ (implicit cache: ApplicationCache): Unit = {
+ val ex = intercept[UncheckedExecutionException] {
+ cache.get(appId, attemptId)
+ }
+ var cause = ex.getCause
+ assert(cause !== null)
+ if (!cause.isInstanceOf[NoSuchElementException]) {
+ throw cause
+ }
+ }
+
+ test("Large Scale Application Eviction") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(0)
+ val size = 5
+ // only two entries are retained, so we expect evictions to occurr on lookups
+ implicit val cache: ApplicationCache = new TestApplicationCache(operations,
+ retainedApplications = size, clock = clock)
+
+ val attempt1 = Some("01")
+
+ val ids = new ListBuffer[String]()
+ // build a list of applications
+ val count = 100
+ for (i <- 1 to count ) {
+ val appId = f"app-$i%04d"
+ ids += appId
+ clock.advance(10)
+ val t = clock.getTimeMillis()
+ operations.putAppUI(appId, attempt1, true, t, t, t)
+ }
+ // now go through them in sequence reading them, expect evictions
+ ids.foreach { id =>
+ cache.get(id, attempt1)
+ }
+ logInfo(cache.toString)
+ val metrics = cache.metrics
+
+ assertMetric("loadCount", metrics.loadCount, count)
+ assertMetric("evictionCount", metrics.evictionCount, count - size)
+}
+
+ test("Attempts are Evicted") {
+ val operations = new StubCacheOperations()
+ implicit val cache: ApplicationCache = new TestApplicationCache(operations,
+ retainedApplications = 4)
+ val metrics = cache.metrics
+ val appId = "app1"
+ val attempt1 = Some("01")
+ val attempt2 = Some("02")
+ val attempt3 = Some("03")
+ operations.putAppUI(appId, attempt1, true, 100, 110, 110)
+ operations.putAppUI(appId, attempt2, true, 200, 210, 210)
+ operations.putAppUI(appId, attempt3, true, 300, 310, 310)
+ val attempt4 = Some("04")
+ operations.putAppUI(appId, attempt4, true, 400, 410, 410)
+ val attempt5 = Some("05")
+ operations.putAppUI(appId, attempt5, true, 500, 510, 510)
+
+ def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = {
+ assertMetric("loadCount", metrics.loadCount, expectedLoad)
+ assertMetric("evictionCount", metrics.evictionCount, expectedEvictionCount)
+ }
+
+ // first entry
+ cache.get(appId, attempt1)
+ expectLoadAndEvictionCounts(1, 0)
+
+ // second
+ cache.get(appId, attempt2)
+ expectLoadAndEvictionCounts(2, 0)
+
+ // no change
+ cache.get(appId, attempt2)
+ expectLoadAndEvictionCounts(2, 0)
+
+ // eviction time
+ cache.get(appId, attempt3)
+ cache.size() should be(3)
+ cache.get(appId, attempt4)
+ expectLoadAndEvictionCounts(4, 0)
+ cache.get(appId, attempt5)
+ expectLoadAndEvictionCounts(5, 1)
+ cache.get(appId, attempt5)
+ expectLoadAndEvictionCounts(5, 1)
+
+ }
+
+ test("Instantiate Filter") {
+ // this is a regression test on the filter being constructable
+ val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
+ val instance = clazz.newInstance()
+ instance shouldBe a [Filter]
+ }
+
+ test("redirect includes query params") {
+ val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
+ val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter]
+ filter.appId = "local-123"
+ val cache = mock[ApplicationCache]
+ when(cache.checkForUpdates(any(), any())).thenReturn(true)
+ ApplicationCacheCheckFilterRelay.setApplicationCache(cache)
+ val request = mock[HttpServletRequest]
+ when(request.getMethod()).thenReturn("GET")
+ when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/")
+ when(request.getQueryString()).thenReturn("id=2")
+ val resp = mock[HttpServletResponse]
+ when(resp.encodeRedirectURL(any())).thenAnswer(new Answer[String](){
+ override def answer(invocationOnMock: InvocationOnMock): String = {
+ invocationOnMock.getArguments()(0).asInstanceOf[String]
+ }
+ })
+ filter.doFilter(request, resp, null)
+ verify(resp).sendRedirect("http://localhost:18080/history/local-123/jobs/job/?id=2")
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 40d0076eec..4b05469c42 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -21,16 +21,28 @@ import java.net.{HttpURLConnection, URL}
import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
-import org.mockito.Mockito.when
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods
+import org.json4s.jackson.JsonMethods._
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually
import org.scalatest.mock.MockitoSugar
+import org.scalatest.selenium.WebBrowser
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.ui.{SparkUI, UIUtils}
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.JobUIData
+import org.apache.spark.util.{ResetSystemProperties, Utils}
/**
* A collection of tests against the historyserver, including comparing responses from the json
@@ -44,7 +56,8 @@ import org.apache.spark.util.ResetSystemProperties
* are considered part of Spark's public api.
*/
class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers with MockitoSugar
- with JsonTestUtils with ResetSystemProperties {
+ with JsonTestUtils with Eventually with WebBrowser with LocalSparkContext
+ with ResetSystemProperties {
private val logDir = new File("src/test/resources/spark-events")
private val expRoot = new File("src/test/resources/HistoryServerExpectations/")
@@ -56,7 +69,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
def init(): Unit = {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
- .set("spark.history.fs.updateInterval", "0")
+ .set("spark.history.fs.update.interval", "0")
.set("spark.testing", "true")
provider = new FsHistoryProvider(conf)
provider.checkForLogs()
@@ -256,6 +269,204 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
all (siteRelativeLinks) should startWith (uiRoot)
}
+ test("incomplete apps get refreshed") {
+
+ implicit val webDriver: WebDriver = new HtmlUnitDriver
+ implicit val formats = org.json4s.DefaultFormats
+
+ // this test dir is explictly deleted on successful runs; retained for diagnostics when
+ // not
+ val logDir = Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+ // a new conf is used with the background thread set and running at its fastest
+ // alllowed refresh rate (1Hz)
+ val myConf = new SparkConf()
+ .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+ .set("spark.eventLog.dir", logDir.getAbsolutePath)
+ .set("spark.history.fs.update.interval", "1s")
+ .set("spark.eventLog.enabled", "true")
+ .set("spark.history.cache.window", "250ms")
+ .remove("spark.testing")
+ val provider = new FsHistoryProvider(myConf)
+ val securityManager = new SecurityManager(myConf)
+
+ sc = new SparkContext("local", "test", myConf)
+ val logDirUri = logDir.toURI
+ val logDirPath = new Path(logDirUri)
+ val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+ def listDir(dir: Path): Seq[FileStatus] = {
+ val statuses = fs.listStatus(dir)
+ statuses.flatMap(
+ stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+ }
+
+ def dumpLogDir(msg: String = ""): Unit = {
+ if (log.isDebugEnabled) {
+ logDebug(msg)
+ listDir(logDirPath).foreach { status =>
+ val s = status.toString
+ logDebug(s)
+ }
+ }
+ }
+
+ // stop the server with the old config, and start the new one
+ server.stop()
+ server = new HistoryServer(myConf, provider, securityManager, 18080)
+ server.initialize()
+ server.bind()
+ val port = server.boundPort
+ val metrics = server.cacheMetrics
+
+ // assert that a metric has a value; if not dump the whole metrics instance
+ def assertMetric(name: String, counter: Counter, expected: Long): Unit = {
+ val actual = counter.getCount
+ if (actual != expected) {
+ // this is here because Scalatest loses stack depth
+ fail(s"Wrong $name value - expected $expected but got $actual" +
+ s" in metrics\n$metrics")
+ }
+ }
+
+ // build a URL for an app or app/attempt plus a page underneath
+ def buildURL(appId: String, suffix: String): URL = {
+ new URL(s"http://localhost:$port/history/$appId$suffix")
+ }
+
+ // build a rest URL for the application and suffix.
+ def applications(appId: String, suffix: String): URL = {
+ new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix")
+ }
+
+ val historyServerRoot = new URL(s"http://localhost:$port/")
+
+ // start initial job
+ val d = sc.parallelize(1 to 10)
+ d.count()
+ val stdInterval = interval(100 milliseconds)
+ val appId = eventually(timeout(20 seconds), stdInterval) {
+ val json = getContentAndCode("applications", port)._2.get
+ val apps = parse(json).asInstanceOf[JArray].arr
+ apps should have size 1
+ (apps.head \ "id").extract[String]
+ }
+
+ val appIdRoot = buildURL(appId, "")
+ val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+ logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+ // sanity check to make sure filter is chaining calls
+ rootAppPage should not be empty
+
+ def getAppUI: SparkUI = {
+ provider.getAppUI(appId, None).get.ui
+ }
+
+ // selenium isn't that useful on failures...add our own reporting
+ def getNumJobs(suffix: String): Int = {
+ val target = buildURL(appId, suffix)
+ val targetBody = HistoryServerSuite.getUrl(target)
+ try {
+ go to target.toExternalForm
+ findAll(cssSelector("tbody tr")).toIndexedSeq.size
+ } catch {
+ case ex: Exception =>
+ throw new Exception(s"Against $target\n$targetBody", ex)
+ }
+ }
+ // use REST API to get #of jobs
+ def getNumJobsRestful(): Int = {
+ val json = HistoryServerSuite.getUrl(applications(appId, "/jobs"))
+ val jsonAst = parse(json)
+ val jobList = jsonAst.asInstanceOf[JArray]
+ jobList.values.size
+ }
+
+ // get a list of app Ids of all apps in a given state. REST API
+ def listApplications(completed: Boolean): Seq[String] = {
+ val json = parse(HistoryServerSuite.getUrl(applications("", "")))
+ logDebug(s"${JsonMethods.pretty(json)}")
+ json match {
+ case JNothing => Seq()
+ case apps: JArray =>
+ apps.filter(app => {
+ (app \ "attempts") match {
+ case attempts: JArray =>
+ val state = (attempts.children.head \ "completed").asInstanceOf[JBool]
+ state.value == completed
+ case _ => false
+ }
+ }).map(app => (app \ "id").asInstanceOf[JString].values)
+ case _ => Seq()
+ }
+ }
+
+ def completedJobs(): Seq[JobUIData] = {
+ getAppUI.jobProgressListener.completedJobs
+ }
+
+ def activeJobs(): Seq[JobUIData] = {
+ getAppUI.jobProgressListener.activeJobs.values.toSeq
+ }
+
+ activeJobs() should have size 0
+ completedJobs() should have size 1
+ getNumJobs("") should be (1)
+ getNumJobs("/jobs") should be (1)
+ getNumJobsRestful() should be (1)
+ assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics")
+
+ // dump state before the next bit of test, which is where update
+ // checking really gets stressed
+ dumpLogDir("filesystem before executing second job")
+ logDebug(s"History Server: $server")
+
+ val d2 = sc.parallelize(1 to 10)
+ d2.count()
+ dumpLogDir("After second job")
+
+ val stdTimeout = timeout(10 seconds)
+ logDebug("waiting for UI to update")
+ eventually(stdTimeout, stdInterval) {
+ assert(2 === getNumJobs(""),
+ s"jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
+ assert(2 === getNumJobs("/jobs"),
+ s"job count under /jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
+ getNumJobsRestful() should be(2)
+ }
+
+ d.count()
+ d.count()
+ eventually(stdTimeout, stdInterval) {
+ assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
+ }
+ val jobcount = getNumJobs("/jobs")
+ assert(!provider.getListing().head.completed)
+
+ listApplications(false) should contain(appId)
+
+ // stop the spark context
+ resetSparkContext()
+ // check the app is now found as completed
+ eventually(stdTimeout, stdInterval) {
+ assert(provider.getListing().head.completed,
+ s"application never completed, server=$server\n")
+ }
+
+ // app becomes observably complete
+ eventually(stdTimeout, stdInterval) {
+ listApplications(true) should contain (appId)
+ }
+ // app is no longer incomplete
+ listApplications(false) should not contain(appId)
+
+ assert(jobcount === getNumJobs("/jobs"))
+
+ // no need to retain the test dir now the tests complete
+ logDir.deleteOnExit();
+
+ }
+
def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path"))
}
@@ -275,6 +486,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
out.write(json)
out.close()
}
+
}
object HistoryServerSuite {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index cedceb2958..c37f6fb20d 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -38,11 +38,25 @@ You can start the history server by executing:
./sbin/start-history-server.sh
-When using the file-system provider class (see spark.history.provider below), the base logging
-directory must be supplied in the <code>spark.history.fs.logDirectory</code> configuration option,
-and should contain sub-directories that each represents an application's event logs. This creates a
-web interface at `http://<server-url>:18080` by default. The history server can be configured as
-follows:
+This creates a web interface at `http://<server-url>:18080` by default, listing incomplete
+and completed applications and attempts, and allowing them to be viewed
+
+When using the file-system provider class (see `spark.history.provider` below), the base logging
+directory must be supplied in the `spark.history.fs.logDirectory` configuration option,
+and should contain sub-directories that each represents an application's event logs.
+
+The spark jobs themselves must be configured to log events, and to log them to the same shared,
+writeable directory. For example, if the server was configured with a log directory of
+`hdfs://namenode/shared/spark-logs`, then the client-side options would be:
+
+```
+spark.eventLog.enabled true
+spark.eventLog.dir hdfs://namenode/shared/spark-logs
+```
+
+The history server can be configured as follows:
+
+### Environment Variables
<table class="table">
<tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr>
@@ -69,11 +83,13 @@ follows:
</tr>
</table>
+### Spark configuration options
+
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.history.provider</td>
- <td>org.apache.spark.deploy.history.FsHistoryProvider</td>
+ <td><code>org.apache.spark.deploy.history.FsHistoryProvider</code></td>
<td>Name of the class implementing the application history backend. Currently there is only
one implementation, provided by Spark, which looks for application logs stored in the
file system.</td>
@@ -82,15 +98,21 @@ follows:
<td>spark.history.fs.logDirectory</td>
<td>file:/tmp/spark-events</td>
<td>
- Directory that contains application event logs to be loaded by the history server
+ For the filesystem history provider, the URL to the directory containing application event
+ logs to load. This can be a local <code>file://</code> path,
+ an HDFS path <code>hdfs://namenode/shared/spark-logs</code>
+ or that of an alternative filesystem supported by the Hadoop APIs.
</td>
</tr>
<tr>
<td>spark.history.fs.update.interval</td>
<td>10s</td>
<td>
- The period at which information displayed by this history server is updated.
- Each update checks for any changes made to the event logs in persisted storage.
+ The period at which the the filesystem history provider checks for new or
+ updated logs in the log directory. A shorter interval detects new applications faster,
+ at the expense of more server load re-reading updated applications.
+ As soon as an update has completed, listings of the completed and incomplete applications
+ will reflect the changes.
</td>
</tr>
<tr>
@@ -112,7 +134,7 @@ follows:
<td>spark.history.kerberos.enabled</td>
<td>false</td>
<td>
- Indicates whether the history server should use kerberos to login. This is useful
+ Indicates whether the history server should use kerberos to login. This is required
if the history server is accessing HDFS files on a secure Hadoop cluster. If this is
true, it uses the configs <code>spark.history.kerberos.principal</code> and
<code>spark.history.kerberos.keytab</code>.
@@ -156,15 +178,15 @@ follows:
<td>spark.history.fs.cleaner.interval</td>
<td>1d</td>
<td>
- How often the job history cleaner checks for files to delete.
- Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.
+ How often the filesystem job history cleaner checks for files to delete.
+ Files are only deleted if they are older than <code>spark.history.fs.cleaner.maxAge</code>
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.maxAge</td>
<td>7d</td>
<td>
- Job history files older than this will be deleted when the history cleaner runs.
+ Job history files older than this will be deleted when the filesystem history cleaner runs.
</td>
</tr>
</table>
@@ -172,7 +194,25 @@ follows:
Note that in all of these UIs, the tables are sortable by clicking their headers,
making it easy to identify slow tasks, data skew, etc.
-Note that the history server only displays completed Spark jobs. One way to signal the completion of a Spark job is to stop the Spark Context explicitly (`sc.stop()`), or in Python using the `with SparkContext() as sc:` to handle the Spark Context setup and tear down, and still show the job history on the UI.
+Note
+
+1. The history server displays both completed and incomplete Spark jobs. If an application makes
+multiple attempts after failures, the failed attempts will be displayed, as well as any ongoing
+incomplete attempt or the final successful attempt.
+
+2. Incomplete applications are only updated intermittently. The time between updates is defined
+by the interval between checks for changed files (`spark.history.fs.update.interval`).
+On larger clusters the update interval may be set to large values.
+The way to view a running application is actually to view its own web UI.
+
+3. Applications which exited without registering themselves as completed will be listed
+as incomplete —even though they are no longer running. This can happen if an application
+crashes.
+
+2. One way to signal the completion of a Spark job is to stop the Spark Context
+explicitly (`sc.stop()`), or in Python using the `with SparkContext() as sc:` construct
+to handle the Spark Context setup and tear down.
+
## REST API
@@ -249,7 +289,7 @@ These endpoints have been strongly versioned to make it easier to develop applic
* New endpoints may be added
* New fields may be added to existing endpoints
* New versions of the api may be added in the future at a separate endpoint (eg., `api/v2`). New versions are *not* required to be backwards compatible.
-* Api versions may be dropped, but only after at least one minor release of co-existing with a new api version
+* Api versions may be dropped, but only after at least one minor release of co-existing with a new api version.
Note that even when examining the UI of a running applications, the `applications/[app-id]` portion is
still required, though there is only one application available. Eg. to see the list of jobs for the
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 133894704b..8611106db0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -233,6 +233,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metadataCleaner"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint")
+ ) ++ Seq(
+ // SPARK-7889
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.deploy.history.HistoryServer.org$apache$spark$deploy$history$HistoryServer$@tachSparkUI")
)
case v if v.startsWith("1.6") =>
Seq(