aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorSteve Loughran <stevel@hortonworks.com>2016-02-11 21:37:53 -0600
committerImran Rashid <irashid@cloudera.com>2016-02-11 21:37:53 -0600
commita2c7dcf61f33fa1897c950d2d905651103c170ea (patch)
tree90268ba2e3c02be159411ed15d31408cd99e505a /core/src/test
parentd3e2e202994e063856c192e9fdd0541777b88e0e (diff)
downloadspark-a2c7dcf61f33fa1897c950d2d905651103c170ea.tar.gz
spark-a2c7dcf61f33fa1897c950d2d905651103c170ea.tar.bz2
spark-a2c7dcf61f33fa1897c950d2d905651103c170ea.zip
[SPARK-7889][WEBUI] HistoryServer updates UI for incomplete apps
When the HistoryServer is showing an incomplete app, it needs to check if there is a newer version of the app available. It does this by checking if a version of the app has been loaded with a larger *filesize*. If so, it detaches the current UI, attaches the new one, and redirects back to the same URL to show the new UI. https://issues.apache.org/jira/browse/SPARK-7889 Author: Steve Loughran <stevel@hortonworks.com> Author: Imran Rashid <irashid@cloudera.com> Closes #11118 from squito/SPARK-7889-alternate.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala488
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala224
2 files changed, 706 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
new file mode 100644
index 0000000000..de6680c610
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.{Date, NoSuchElementException}
+import javax.servlet.Filter
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
+import com.google.common.cache.LoadingCache
+import com.google.common.util.concurrent.UncheckedExecutionException
+import org.eclipse.jetty.servlet.ServletContextHandler
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.status.api.v1.{ApplicationAttemptInfo => AttemptInfo, ApplicationInfo}
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.{Clock, ManualClock, Utils}
+
+class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar with Matchers {
+
+ /**
+ * subclass with access to the cache internals
+ * @param retainedApplications number of retained applications
+ */
+ class TestApplicationCache(
+ operations: ApplicationCacheOperations = new StubCacheOperations(),
+ retainedApplications: Int,
+ clock: Clock = new ManualClock(0))
+ extends ApplicationCache(operations, retainedApplications, clock) {
+
+ def cache(): LoadingCache[CacheKey, CacheEntry] = appCache
+ }
+
+ /**
+ * Stub cache operations.
+ * The state is kept in a map of [[CacheKey]] to [[CacheEntry]],
+ * the `probeTime` field in the cache entry setting the timestamp of the entry
+ */
+ class StubCacheOperations extends ApplicationCacheOperations with Logging {
+
+ /** map to UI instances, including timestamps, which are used in update probes */
+ val instances = mutable.HashMap.empty[CacheKey, CacheEntry]
+
+ /** Map of attached spark UIs */
+ val attached = mutable.HashMap.empty[CacheKey, SparkUI]
+
+ var getAppUICount = 0L
+ var attachCount = 0L
+ var detachCount = 0L
+ var updateProbeCount = 0L
+
+ override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
+ logDebug(s"getAppUI($appId, $attemptId)")
+ getAppUICount += 1
+ instances.get(CacheKey(appId, attemptId)).map( e =>
+ LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime)))
+ }
+
+ override def attachSparkUI(
+ appId: String,
+ attemptId: Option[String],
+ ui: SparkUI,
+ completed: Boolean): Unit = {
+ logDebug(s"attachSparkUI($appId, $attemptId, $ui)")
+ attachCount += 1
+ attached += (CacheKey(appId, attemptId) -> ui)
+ }
+
+ def putAndAttach(
+ appId: String,
+ attemptId: Option[String],
+ completed: Boolean,
+ started: Long,
+ ended: Long,
+ timestamp: Long): SparkUI = {
+ val ui = putAppUI(appId, attemptId, completed, started, ended, timestamp)
+ attachSparkUI(appId, attemptId, ui, completed)
+ ui
+ }
+
+ def putAppUI(
+ appId: String,
+ attemptId: Option[String],
+ completed: Boolean,
+ started: Long,
+ ended: Long,
+ timestamp: Long): SparkUI = {
+ val ui = newUI(appId, attemptId, completed, started, ended)
+ putInstance(appId, attemptId, ui, completed, timestamp)
+ ui
+ }
+
+ def putInstance(
+ appId: String,
+ attemptId: Option[String],
+ ui: SparkUI,
+ completed: Boolean,
+ timestamp: Long): Unit = {
+ instances += (CacheKey(appId, attemptId) ->
+ new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp))
+ }
+
+ /**
+ * Detach a reconstructed UI
+ *
+ * @param ui Spark UI
+ */
+ override def detachSparkUI(appId: String, attemptId: Option[String], ui: SparkUI): Unit = {
+ logDebug(s"detachSparkUI($appId, $attemptId, $ui)")
+ detachCount += 1
+ var name = ui.getAppName
+ val key = CacheKey(appId, attemptId)
+ attached.getOrElse(key, { throw new java.util.NoSuchElementException() })
+ attached -= key
+ }
+
+ /**
+ * Lookup from the internal cache of attached UIs
+ */
+ def getAttached(appId: String, attemptId: Option[String]): Option[SparkUI] = {
+ attached.get(CacheKey(appId, attemptId))
+ }
+
+ /**
+ * The update probe.
+ * @param appId application to probe
+ * @param attemptId attempt to probe
+ * @param updateTime timestamp of this UI load
+ */
+ private[history] def updateProbe(
+ appId: String,
+ attemptId: Option[String],
+ updateTime: Long)(): Boolean = {
+ updateProbeCount += 1
+ logDebug(s"isUpdated($appId, $attemptId, ${updateTime})")
+ val entry = instances.get(CacheKey(appId, attemptId)).get
+ val updated = entry.probeTime > updateTime
+ logDebug(s"entry = $entry; updated = $updated")
+ updated
+ }
+ }
+
+ /**
+ * Create a new UI. The info/attempt info classes here are from the package
+ * `org.apache.spark.status.api.v1`, not the near-equivalents from the history package
+ */
+ def newUI(
+ name: String,
+ attemptId: Option[String],
+ completed: Boolean,
+ started: Long,
+ ended: Long): SparkUI = {
+ val info = new ApplicationInfo(name, name, Some(1), Some(1), Some(1), Some(64),
+ Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended),
+ new Date(ended), ended - started, "user", completed)))
+ val ui = mock[SparkUI]
+ when(ui.getApplicationInfoList).thenReturn(List(info).iterator)
+ when(ui.getAppName).thenReturn(name)
+ when(ui.appName).thenReturn(name)
+ val handler = new ServletContextHandler()
+ when(ui.getHandlers).thenReturn(Seq(handler))
+ ui
+ }
+
+ /**
+ * Test operations on completed UIs: they are loaded on demand, entries
+ * are removed on overload.
+ *
+ * This effectively tests the original behavior of the history server's cache.
+ */
+ test("Completed UI get") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(1)
+ implicit val cache = new ApplicationCache(operations, 2, clock)
+ val metrics = cache.metrics
+ // cache misses
+ val app1 = "app-1"
+ assertNotFound(app1, None)
+ assertMetric("lookupCount", metrics.lookupCount, 1)
+ assertMetric("lookupFailureCount", metrics.lookupFailureCount, 1)
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assertNotFound(app1, None)
+ assert(2 === operations.getAppUICount, "getAppUICount")
+ assert(0 === operations.attachCount, "attachCount")
+
+ val now = clock.getTimeMillis()
+ // add the entry
+ operations.putAppUI(app1, None, true, now, now, now)
+
+ // make sure its local
+ operations.getAppUI(app1, None).get
+ operations.getAppUICount = 0
+ // now expect it to be found
+ val cacheEntry = cache.lookupCacheEntry(app1, None)
+ assert(1 === cacheEntry.probeTime)
+ assert(cacheEntry.completed)
+ // assert about queries made of the opereations
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assert(1 === operations.attachCount, "attachCount")
+
+ // and in the map of attached
+ assert(operations.getAttached(app1, None).isDefined, s"attached entry '1' from $cache")
+
+ // go forward in time
+ clock.setTime(10)
+ val time2 = clock.getTimeMillis()
+ val cacheEntry2 = cache.get(app1)
+ // no more refresh as this is a completed app
+ assert(1 === operations.getAppUICount, "getAppUICount")
+ assert(0 === operations.updateProbeCount, "updateProbeCount")
+ assert(0 === operations.detachCount, "attachCount")
+
+ // evict the entry
+ operations.putAndAttach("2", None, true, time2, time2, time2)
+ operations.putAndAttach("3", None, true, time2, time2, time2)
+ cache.get("2")
+ cache.get("3")
+
+ // there should have been a detachment here
+ assert(1 === operations.detachCount, s"detach count from $cache")
+ // and entry app1 no longer attached
+ assert(operations.getAttached(app1, None).isEmpty, s"get($app1) in $cache")
+ val appId = "app1"
+ val attemptId = Some("_01")
+ val time3 = clock.getTimeMillis()
+ operations.putAppUI(appId, attemptId, false, time3, 0, time3)
+ // expect an error here
+ assertNotFound(appId, None)
+ }
+
+ test("Test that if an attempt ID is is set, it must be used in lookups") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(1)
+ implicit val cache = new ApplicationCache(operations, retainedApplications = 10, clock = clock)
+ val appId = "app1"
+ val attemptId = Some("_01")
+ operations.putAppUI(appId, attemptId, false, clock.getTimeMillis(), 0, 0)
+ assertNotFound(appId, None)
+ }
+
+ /**
+ * Test that incomplete apps are not probed for updates during the time window,
+ * but that they are checked if that window has expired and they are not completed.
+ * Then, if they have changed, the old entry is replaced by a new one.
+ */
+ test("Incomplete apps refreshed") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(50)
+ val window = 500
+ implicit val cache = new ApplicationCache(operations, retainedApplications = 5, clock = clock)
+ val metrics = cache.metrics
+ // add the incomplete app
+ // add the entry
+ val started = clock.getTimeMillis()
+ val appId = "app1"
+ val attemptId = Some("001")
+ operations.putAppUI(appId, attemptId, false, started, 0, started)
+ val firstEntry = cache.lookupCacheEntry(appId, attemptId)
+ assert(started === firstEntry.probeTime, s"timestamp in $firstEntry")
+ assert(!firstEntry.completed, s"entry is complete: $firstEntry")
+ assertMetric("lookupCount", metrics.lookupCount, 1)
+
+ assert(0 === operations.updateProbeCount, "expected no update probe on that first get")
+
+ val checkTime = window * 2
+ clock.setTime(checkTime)
+ val entry3 = cache.lookupCacheEntry(appId, attemptId)
+ assert(firstEntry !== entry3, s"updated entry test from $cache")
+ assertMetric("lookupCount", metrics.lookupCount, 2)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 1)
+ assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 0)
+ assert(1 === operations.updateProbeCount, s"refresh count in $cache")
+ assert(0 === operations.detachCount, s"detach count")
+ assert(entry3.probeTime === checkTime)
+
+ val updateTime = window * 3
+ // update the cached value
+ val updatedApp = operations.putAppUI(appId, attemptId, true, started, updateTime, updateTime)
+ val endTime = window * 10
+ clock.setTime(endTime)
+ logDebug(s"Before operation = $cache")
+ val entry5 = cache.lookupCacheEntry(appId, attemptId)
+ assertMetric("lookupCount", metrics.lookupCount, 3)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+ // the update was triggered
+ assertMetric("updateTriggeredCount", metrics.updateTriggeredCount, 1)
+ assert(updatedApp === entry5.ui, s"UI {$updatedApp} did not match entry {$entry5} in $cache")
+
+ // at which point, the refreshes stop
+ clock.setTime(window * 20)
+ assertCacheEntryEquals(appId, attemptId, entry5)
+ assertMetric("updateProbeCount", metrics.updateProbeCount, 2)
+ }
+
+ /**
+ * Assert that a metric counter has a specific value; failure raises an exception
+ * including the cache's toString value
+ * @param name counter name (for exceptions)
+ * @param counter counter
+ * @param expected expected value.
+ * @param cache cache
+ */
+ def assertMetric(
+ name: String,
+ counter: Counter,
+ expected: Long)
+ (implicit cache: ApplicationCache): Unit = {
+ val actual = counter.getCount
+ if (actual != expected) {
+ // this is here because Scalatest loses stack depth
+ throw new Exception(s"Wrong $name value - expected $expected but got $actual in $cache")
+ }
+ }
+
+ /**
+ * Look up the cache entry and assert that it maches in the expected value.
+ * This assertion works if the two CacheEntries are different -it looks at the fields.
+ * UI are compared on object equality; the timestamp and completed flags directly.
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @param expected expected value
+ * @param cache app cache
+ */
+ def assertCacheEntryEquals(
+ appId: String,
+ attemptId: Option[String],
+ expected: CacheEntry)
+ (implicit cache: ApplicationCache): Unit = {
+ val actual = cache.lookupCacheEntry(appId, attemptId)
+ val errorText = s"Expected get($appId, $attemptId) -> $expected, but got $actual from $cache"
+ assert(expected.ui === actual.ui, errorText + " SparkUI reference")
+ assert(expected.completed === actual.completed, errorText + " -completed flag")
+ assert(expected.probeTime === actual.probeTime, errorText + " -timestamp")
+ }
+
+ /**
+ * Assert that a key wasn't found in cache or loaded.
+ *
+ * Looks for the specific nested exception raised by [[ApplicationCache]]
+ * @param appId application ID
+ * @param attemptId attempt ID
+ * @param cache app cache
+ */
+ def assertNotFound(
+ appId: String,
+ attemptId: Option[String])
+ (implicit cache: ApplicationCache): Unit = {
+ val ex = intercept[UncheckedExecutionException] {
+ cache.get(appId, attemptId)
+ }
+ var cause = ex.getCause
+ assert(cause !== null)
+ if (!cause.isInstanceOf[NoSuchElementException]) {
+ throw cause
+ }
+ }
+
+ test("Large Scale Application Eviction") {
+ val operations = new StubCacheOperations()
+ val clock = new ManualClock(0)
+ val size = 5
+ // only two entries are retained, so we expect evictions to occurr on lookups
+ implicit val cache: ApplicationCache = new TestApplicationCache(operations,
+ retainedApplications = size, clock = clock)
+
+ val attempt1 = Some("01")
+
+ val ids = new ListBuffer[String]()
+ // build a list of applications
+ val count = 100
+ for (i <- 1 to count ) {
+ val appId = f"app-$i%04d"
+ ids += appId
+ clock.advance(10)
+ val t = clock.getTimeMillis()
+ operations.putAppUI(appId, attempt1, true, t, t, t)
+ }
+ // now go through them in sequence reading them, expect evictions
+ ids.foreach { id =>
+ cache.get(id, attempt1)
+ }
+ logInfo(cache.toString)
+ val metrics = cache.metrics
+
+ assertMetric("loadCount", metrics.loadCount, count)
+ assertMetric("evictionCount", metrics.evictionCount, count - size)
+}
+
+ test("Attempts are Evicted") {
+ val operations = new StubCacheOperations()
+ implicit val cache: ApplicationCache = new TestApplicationCache(operations,
+ retainedApplications = 4)
+ val metrics = cache.metrics
+ val appId = "app1"
+ val attempt1 = Some("01")
+ val attempt2 = Some("02")
+ val attempt3 = Some("03")
+ operations.putAppUI(appId, attempt1, true, 100, 110, 110)
+ operations.putAppUI(appId, attempt2, true, 200, 210, 210)
+ operations.putAppUI(appId, attempt3, true, 300, 310, 310)
+ val attempt4 = Some("04")
+ operations.putAppUI(appId, attempt4, true, 400, 410, 410)
+ val attempt5 = Some("05")
+ operations.putAppUI(appId, attempt5, true, 500, 510, 510)
+
+ def expectLoadAndEvictionCounts(expectedLoad: Int, expectedEvictionCount: Int): Unit = {
+ assertMetric("loadCount", metrics.loadCount, expectedLoad)
+ assertMetric("evictionCount", metrics.evictionCount, expectedEvictionCount)
+ }
+
+ // first entry
+ cache.get(appId, attempt1)
+ expectLoadAndEvictionCounts(1, 0)
+
+ // second
+ cache.get(appId, attempt2)
+ expectLoadAndEvictionCounts(2, 0)
+
+ // no change
+ cache.get(appId, attempt2)
+ expectLoadAndEvictionCounts(2, 0)
+
+ // eviction time
+ cache.get(appId, attempt3)
+ cache.size() should be(3)
+ cache.get(appId, attempt4)
+ expectLoadAndEvictionCounts(4, 0)
+ cache.get(appId, attempt5)
+ expectLoadAndEvictionCounts(5, 1)
+ cache.get(appId, attempt5)
+ expectLoadAndEvictionCounts(5, 1)
+
+ }
+
+ test("Instantiate Filter") {
+ // this is a regression test on the filter being constructable
+ val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
+ val instance = clazz.newInstance()
+ instance shouldBe a [Filter]
+ }
+
+ test("redirect includes query params") {
+ val clazz = Utils.classForName(ApplicationCacheCheckFilterRelay.FILTER_NAME)
+ val filter = clazz.newInstance().asInstanceOf[ApplicationCacheCheckFilter]
+ filter.appId = "local-123"
+ val cache = mock[ApplicationCache]
+ when(cache.checkForUpdates(any(), any())).thenReturn(true)
+ ApplicationCacheCheckFilterRelay.setApplicationCache(cache)
+ val request = mock[HttpServletRequest]
+ when(request.getMethod()).thenReturn("GET")
+ when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/")
+ when(request.getQueryString()).thenReturn("id=2")
+ val resp = mock[HttpServletResponse]
+ when(resp.encodeRedirectURL(any())).thenAnswer(new Answer[String](){
+ override def answer(invocationOnMock: InvocationOnMock): String = {
+ invocationOnMock.getArguments()(0).asInstanceOf[String]
+ }
+ })
+ filter.doFilter(request, resp, null)
+ verify(resp).sendRedirect("http://localhost:18080/history/local-123/jobs/job/?id=2")
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 40d0076eec..4b05469c42 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -21,16 +21,28 @@ import java.net.{HttpURLConnection, URL}
import java.util.zip.ZipInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.codahale.metrics.Counter
import com.google.common.base.Charsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
-import org.mockito.Mockito.when
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods
+import org.json4s.jackson.JsonMethods._
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.concurrent.Eventually
import org.scalatest.mock.MockitoSugar
+import org.scalatest.selenium.WebBrowser
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.ui.{SparkUI, UIUtils}
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.UIData.JobUIData
+import org.apache.spark.util.{ResetSystemProperties, Utils}
/**
* A collection of tests against the historyserver, including comparing responses from the json
@@ -44,7 +56,8 @@ import org.apache.spark.util.ResetSystemProperties
* are considered part of Spark's public api.
*/
class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers with MockitoSugar
- with JsonTestUtils with ResetSystemProperties {
+ with JsonTestUtils with Eventually with WebBrowser with LocalSparkContext
+ with ResetSystemProperties {
private val logDir = new File("src/test/resources/spark-events")
private val expRoot = new File("src/test/resources/HistoryServerExpectations/")
@@ -56,7 +69,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
def init(): Unit = {
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
- .set("spark.history.fs.updateInterval", "0")
+ .set("spark.history.fs.update.interval", "0")
.set("spark.testing", "true")
provider = new FsHistoryProvider(conf)
provider.checkForLogs()
@@ -256,6 +269,204 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
all (siteRelativeLinks) should startWith (uiRoot)
}
+ test("incomplete apps get refreshed") {
+
+ implicit val webDriver: WebDriver = new HtmlUnitDriver
+ implicit val formats = org.json4s.DefaultFormats
+
+ // this test dir is explictly deleted on successful runs; retained for diagnostics when
+ // not
+ val logDir = Utils.createDirectory(System.getProperty("java.io.tmpdir", "logs"))
+
+ // a new conf is used with the background thread set and running at its fastest
+ // alllowed refresh rate (1Hz)
+ val myConf = new SparkConf()
+ .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
+ .set("spark.eventLog.dir", logDir.getAbsolutePath)
+ .set("spark.history.fs.update.interval", "1s")
+ .set("spark.eventLog.enabled", "true")
+ .set("spark.history.cache.window", "250ms")
+ .remove("spark.testing")
+ val provider = new FsHistoryProvider(myConf)
+ val securityManager = new SecurityManager(myConf)
+
+ sc = new SparkContext("local", "test", myConf)
+ val logDirUri = logDir.toURI
+ val logDirPath = new Path(logDirUri)
+ val fs = FileSystem.get(logDirUri, sc.hadoopConfiguration)
+
+ def listDir(dir: Path): Seq[FileStatus] = {
+ val statuses = fs.listStatus(dir)
+ statuses.flatMap(
+ stat => if (stat.isDirectory) listDir(stat.getPath) else Seq(stat))
+ }
+
+ def dumpLogDir(msg: String = ""): Unit = {
+ if (log.isDebugEnabled) {
+ logDebug(msg)
+ listDir(logDirPath).foreach { status =>
+ val s = status.toString
+ logDebug(s)
+ }
+ }
+ }
+
+ // stop the server with the old config, and start the new one
+ server.stop()
+ server = new HistoryServer(myConf, provider, securityManager, 18080)
+ server.initialize()
+ server.bind()
+ val port = server.boundPort
+ val metrics = server.cacheMetrics
+
+ // assert that a metric has a value; if not dump the whole metrics instance
+ def assertMetric(name: String, counter: Counter, expected: Long): Unit = {
+ val actual = counter.getCount
+ if (actual != expected) {
+ // this is here because Scalatest loses stack depth
+ fail(s"Wrong $name value - expected $expected but got $actual" +
+ s" in metrics\n$metrics")
+ }
+ }
+
+ // build a URL for an app or app/attempt plus a page underneath
+ def buildURL(appId: String, suffix: String): URL = {
+ new URL(s"http://localhost:$port/history/$appId$suffix")
+ }
+
+ // build a rest URL for the application and suffix.
+ def applications(appId: String, suffix: String): URL = {
+ new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix")
+ }
+
+ val historyServerRoot = new URL(s"http://localhost:$port/")
+
+ // start initial job
+ val d = sc.parallelize(1 to 10)
+ d.count()
+ val stdInterval = interval(100 milliseconds)
+ val appId = eventually(timeout(20 seconds), stdInterval) {
+ val json = getContentAndCode("applications", port)._2.get
+ val apps = parse(json).asInstanceOf[JArray].arr
+ apps should have size 1
+ (apps.head \ "id").extract[String]
+ }
+
+ val appIdRoot = buildURL(appId, "")
+ val rootAppPage = HistoryServerSuite.getUrl(appIdRoot)
+ logDebug(s"$appIdRoot ->[${rootAppPage.length}] \n$rootAppPage")
+ // sanity check to make sure filter is chaining calls
+ rootAppPage should not be empty
+
+ def getAppUI: SparkUI = {
+ provider.getAppUI(appId, None).get.ui
+ }
+
+ // selenium isn't that useful on failures...add our own reporting
+ def getNumJobs(suffix: String): Int = {
+ val target = buildURL(appId, suffix)
+ val targetBody = HistoryServerSuite.getUrl(target)
+ try {
+ go to target.toExternalForm
+ findAll(cssSelector("tbody tr")).toIndexedSeq.size
+ } catch {
+ case ex: Exception =>
+ throw new Exception(s"Against $target\n$targetBody", ex)
+ }
+ }
+ // use REST API to get #of jobs
+ def getNumJobsRestful(): Int = {
+ val json = HistoryServerSuite.getUrl(applications(appId, "/jobs"))
+ val jsonAst = parse(json)
+ val jobList = jsonAst.asInstanceOf[JArray]
+ jobList.values.size
+ }
+
+ // get a list of app Ids of all apps in a given state. REST API
+ def listApplications(completed: Boolean): Seq[String] = {
+ val json = parse(HistoryServerSuite.getUrl(applications("", "")))
+ logDebug(s"${JsonMethods.pretty(json)}")
+ json match {
+ case JNothing => Seq()
+ case apps: JArray =>
+ apps.filter(app => {
+ (app \ "attempts") match {
+ case attempts: JArray =>
+ val state = (attempts.children.head \ "completed").asInstanceOf[JBool]
+ state.value == completed
+ case _ => false
+ }
+ }).map(app => (app \ "id").asInstanceOf[JString].values)
+ case _ => Seq()
+ }
+ }
+
+ def completedJobs(): Seq[JobUIData] = {
+ getAppUI.jobProgressListener.completedJobs
+ }
+
+ def activeJobs(): Seq[JobUIData] = {
+ getAppUI.jobProgressListener.activeJobs.values.toSeq
+ }
+
+ activeJobs() should have size 0
+ completedJobs() should have size 1
+ getNumJobs("") should be (1)
+ getNumJobs("/jobs") should be (1)
+ getNumJobsRestful() should be (1)
+ assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics")
+
+ // dump state before the next bit of test, which is where update
+ // checking really gets stressed
+ dumpLogDir("filesystem before executing second job")
+ logDebug(s"History Server: $server")
+
+ val d2 = sc.parallelize(1 to 10)
+ d2.count()
+ dumpLogDir("After second job")
+
+ val stdTimeout = timeout(10 seconds)
+ logDebug("waiting for UI to update")
+ eventually(stdTimeout, stdInterval) {
+ assert(2 === getNumJobs(""),
+ s"jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
+ assert(2 === getNumJobs("/jobs"),
+ s"job count under /jobs not updated, server=$server\n dir = ${listDir(logDirPath)}")
+ getNumJobsRestful() should be(2)
+ }
+
+ d.count()
+ d.count()
+ eventually(stdTimeout, stdInterval) {
+ assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n")
+ }
+ val jobcount = getNumJobs("/jobs")
+ assert(!provider.getListing().head.completed)
+
+ listApplications(false) should contain(appId)
+
+ // stop the spark context
+ resetSparkContext()
+ // check the app is now found as completed
+ eventually(stdTimeout, stdInterval) {
+ assert(provider.getListing().head.completed,
+ s"application never completed, server=$server\n")
+ }
+
+ // app becomes observably complete
+ eventually(stdTimeout, stdInterval) {
+ listApplications(true) should contain (appId)
+ }
+ // app is no longer incomplete
+ listApplications(false) should not contain(appId)
+
+ assert(jobcount === getNumJobs("/jobs"))
+
+ // no need to retain the test dir now the tests complete
+ logDir.deleteOnExit();
+
+ }
+
def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path"))
}
@@ -275,6 +486,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
out.write(json)
out.close()
}
+
}
object HistoryServerSuite {