diff options
Diffstat (limited to 'core/src/test/scala')
5 files changed, 42 insertions, 522 deletions
diff --git a/core/src/test/scala/org/apache/spark/JsonTestUtils.scala b/core/src/test/scala/org/apache/spark/JsonTestUtils.scala deleted file mode 100644 index ba367cd476..0000000000 --- a/core/src/test/scala/org/apache/spark/JsonTestUtils.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark - -import org.json4s._ -import org.json4s.jackson.JsonMethods - -trait JsonTestUtils { - def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) { - val Diff(c, a, d) = validateJson.diff(expectedJson) - val validatePretty = JsonMethods.pretty(validateJson) - val expectedPretty = JsonMethods.pretty(expectedJson) - val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty" - import org.scalactic.TripleEquals._ - assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}") - assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}") - assert(d === JNothing, s"$errorMessage\nDeleted:\n${JsonMethods.pretty(d)}") - } - -} diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index e04a792841..b58d62567a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -28,9 +28,9 @@ import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo} import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf} +import org.apache.spark.{SecurityManager, SparkConf} -class JsonProtocolSuite extends FunSuite with JsonTestUtils { +class JsonProtocolSuite extends FunSuite { test("writeApplicationInfo") { val output = JsonProtocol.writeApplicationInfo(createAppInfo()) @@ -136,6 +136,16 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils { case e: JsonParseException => fail("Invalid Json detected", e) } } + + def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) { + val Diff(c, a, d) = validateJson diff expectedJson + val validatePretty = JsonMethods.pretty(validateJson) + val expectedPretty = JsonMethods.pretty(expectedJson) + val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty" + assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}") + assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}") + assert(d === JNothing, s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}") + } } object JsonConstants { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 0744b68c69..71ba9c1825 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -14,161 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.deploy.history -import java.io.{File, FileInputStream, FileWriter, IOException} -import java.net.{HttpURLConnection, URL} -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import javax.servlet.http.HttpServletRequest + +import scala.collection.mutable -import org.apache.commons.io.{FileUtils, IOUtils} -import org.mockito.Mockito.when -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.apache.hadoop.fs.Path +import org.mockito.Mockito.{when} +import org.scalatest.FunSuite +import org.scalatest.Matchers import org.scalatest.mock.MockitoSugar -import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf} import org.apache.spark.ui.SparkUI -/** - * A collection of tests against the historyserver, including comparing responses from the json - * metrics api to a set of known "golden files". If new endpoints / parameters are added, - * cases should be added to this test suite. The expected outcomes can be genered by running - * the HistoryServerSuite.main. Note that this will blindly generate new expectation files matching - * the current behavior -- the developer must verify that behavior is correct. - * - * Similarly, if the behavior is changed, HistoryServerSuite.main can be run to update the - * expectations. However, in general this should be done with extreme caution, as the metrics - * are considered part of Spark's public api. - */ -class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with MockitoSugar - with JsonTestUtils { - - private val logDir = new File("src/test/resources/spark-events") - private val expRoot = new File("src/test/resources/HistoryServerExpectations/") - - private var provider: FsHistoryProvider = null - private var server: HistoryServer = null - private var port: Int = -1 - - def init(): Unit = { - val conf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) - .set("spark.history.fs.updateInterval", "0") - .set("spark.testing", "true") - provider = new FsHistoryProvider(conf) - provider.checkForLogs() - val securityManager = new SecurityManager(conf) - - server = new HistoryServer(conf, provider, securityManager, 18080) - server.initialize() - server.bind() - port = server.boundPort - } - - def stop(): Unit = { - server.stop() - } - - before { - init() - } - - after{ - stop() - } - - val cases = Seq( - "application list json" -> "applications", - "completed app list json" -> "applications?status=completed", - "running app list json" -> "applications?status=running", - "minDate app list json" -> "applications?minDate=2015-02-10", - "maxDate app list json" -> "applications?maxDate=2015-02-10", - "maxDate2 app list json" -> "applications?maxDate=2015-02-03T10:42:40.000CST", - "one app json" -> "applications/local-1422981780767", - "one app multi-attempt json" -> "applications/local-1426533911241", - "job list json" -> "applications/local-1422981780767/jobs", - "job list from multi-attempt app json(1)" -> "applications/local-1426533911241/1/jobs", - "job list from multi-attempt app json(2)" -> "applications/local-1426533911241/2/jobs", - "one job json" -> "applications/local-1422981780767/jobs/0", - "succeeded job list json" -> "applications/local-1422981780767/jobs?status=succeeded", - "succeeded&failed job list json" -> - "applications/local-1422981780767/jobs?status=succeeded&status=failed", - "executor list json" -> "applications/local-1422981780767/executors", - "stage list json" -> "applications/local-1422981780767/stages", - "complete stage list json" -> "applications/local-1422981780767/stages?status=complete", - "failed stage list json" -> "applications/local-1422981780767/stages?status=failed", - "one stage json" -> "applications/local-1422981780767/stages/1", - "one stage attempt json" -> "applications/local-1422981780767/stages/1/0", - - "stage task summary" -> "applications/local-1427397477963/stages/20/0/taskSummary", - "stage task summary w/ custom quantiles" -> - "applications/local-1427397477963/stages/20/0/taskSummary?quantiles=0.01,0.5,0.99", - - "stage task list" -> "applications/local-1427397477963/stages/20/0/taskList", - "stage task list w/ offset & length" -> - "applications/local-1427397477963/stages/20/0/taskList?offset=10&length=50", - "stage task list w/ sortBy" -> - "applications/local-1427397477963/stages/20/0/taskList?sortBy=DECREASING_RUNTIME", - "stage task list w/ sortBy short names: -runtime" -> - "applications/local-1427397477963/stages/20/0/taskList?sortBy=-runtime", - "stage task list w/ sortBy short names: runtime" -> - "applications/local-1427397477963/stages/20/0/taskList?sortBy=runtime", - - "stage list with accumulable json" -> "applications/local-1426533911241/1/stages", - "stage with accumulable json" -> "applications/local-1426533911241/1/stages/0/0", - "stage task list from multi-attempt app json(1)" -> - "applications/local-1426533911241/1/stages/0/0/taskList", - "stage task list from multi-attempt app json(2)" -> - "applications/local-1426533911241/2/stages/0/0/taskList", - - "rdd list storage json" -> "applications/local-1422981780767/storage/rdd", - "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0" - ) - - // run a bunch of characterization tests -- just verify the behavior is the same as what is saved - // in the test resource folder - cases.foreach { case (name, path) => - test(name) { - val (code, jsonOpt, errOpt) = getContentAndCode(path) - code should be (HttpServletResponse.SC_OK) - jsonOpt should be ('defined) - errOpt should be (None) - val json = jsonOpt.get - val exp = IOUtils.toString(new FileInputStream( - new File(expRoot, path + "/json_expectation"))) - // compare the ASTs so formatting differences don't cause failures - import org.json4s._ - import org.json4s.jackson.JsonMethods._ - val jsonAst = parse(json) - val expAst = parse(exp) - assertValidDataInJson(jsonAst, expAst) - } - } - - test("response codes on bad paths") { - val badAppId = getContentAndCode("applications/foobar") - badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND) - badAppId._3 should be (Some("unknown app: foobar")) - - val badStageId = getContentAndCode("applications/local-1422981780767/stages/12345") - badStageId._1 should be (HttpServletResponse.SC_NOT_FOUND) - badStageId._3 should be (Some("unknown stage: 12345")) - - val badStageAttemptId = getContentAndCode("applications/local-1422981780767/stages/1/1") - badStageAttemptId._1 should be (HttpServletResponse.SC_NOT_FOUND) - badStageAttemptId._3 should be (Some("unknown attempt for stage 1. Found attempts: [0]")) - - val badStageId2 = getContentAndCode("applications/local-1422981780767/stages/flimflam") - badStageId2._1 should be (HttpServletResponse.SC_NOT_FOUND) - // will take some mucking w/ jersey to get a better error msg in this case - - val badQuantiles = getContentAndCode( - "applications/local-1427397477963/stages/20/0/taskSummary?quantiles=foo,0.1") - badQuantiles._1 should be (HttpServletResponse.SC_BAD_REQUEST) - badQuantiles._3 should be (Some("Bad value for parameter \"quantiles\". Expected a double, " + - "got \"foo\"")) - - getContentAndCode("foobar")._1 should be (HttpServletResponse.SC_NOT_FOUND) - } +class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar { test("generate history page with relative links") { val historyServer = mock[HistoryServer] @@ -193,70 +54,4 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with } yield (attrs.toString) justHrefs should contain(link) } - - def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = { - HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/json/v1/$path")) - } - - def getUrl(path: String): String = { - HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/json/v1/$path")) - } - - def generateExpectation(path: String): Unit = { - val json = getUrl(path) - val dir = new File(expRoot, path) - dir.mkdirs() - val out = new FileWriter(new File(dir, "json_expectation")) - out.write(json) - out.close() - } -} - -object HistoryServerSuite { - def main(args: Array[String]): Unit = { - // generate the "expected" results for the characterization tests. Just blindly assume the - // current behavior is correct, and write out the returned json to the test/resource files - - val suite = new HistoryServerSuite - FileUtils.deleteDirectory(suite.expRoot) - suite.expRoot.mkdirs() - try { - suite.init() - suite.cases.foreach { case (_, path) => - suite.generateExpectation(path) - } - } finally { - suite.stop() - } - } - - def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = { - val connection = url.openConnection().asInstanceOf[HttpURLConnection] - connection.setRequestMethod("GET") - connection.connect() - val code = connection.getResponseCode() - val inString = try { - val in = Option(connection.getInputStream()) - in.map{IOUtils.toString} - } catch { - case io: IOException => None - } - val errString = try { - val err = Option(connection.getErrorStream()) - err.map{IOUtils.toString} - } catch { - case io: IOException => None - } - (code, inString, errString) - } - - def getUrl(path: URL): String = { - val (code, resultOpt, error) = getContentAndCode(path) - if (code == 200) { - resultOpt.get - } else { - throw new RuntimeException( - "got code: " + code + " when getting " + path + " w/ error: " + error) - } - } } diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala b/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala deleted file mode 100644 index 5274df904d..0000000000 --- a/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.status.api.v1 - -import org.scalatest.{Matchers, FunSuite} - -class SimpleDateParamTest extends FunSuite with Matchers { - - test("date parsing") { - new SimpleDateParam("2015-02-20T23:21:17.190GMT").timestamp should be (1424474477190L) - new SimpleDateParam("2015-02-20T17:21:17.190CST").timestamp should be (1424474477190L) - new SimpleDateParam("2015-02-20").timestamp should be (1424390400000L) // GMT - } - -} diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 117b2c3960..d53d7f3ba5 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -18,13 +18,11 @@ package org.apache.spark.ui import java.net.{HttpURLConnection, URL} -import javax.servlet.http.{HttpServletResponse, HttpServletRequest} +import javax.servlet.http.HttpServletRequest import scala.collection.JavaConversions._ import scala.xml.Node -import org.json4s._ -import org.json4s.jackson.JsonMethods import org.openqa.selenium.htmlunit.HtmlUnitDriver import org.openqa.selenium.{By, WebDriver} import org.scalatest._ @@ -35,9 +33,8 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.LocalSparkContext._ import org.apache.spark._ import org.apache.spark.api.java.StorageLevels -import org.apache.spark.deploy.history.HistoryServerSuite import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus} + /** * Selenium tests for the Spark Web UI. @@ -45,8 +42,6 @@ import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus} class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll { implicit var webDriver: WebDriver = _ - implicit val formats = DefaultFormats - override def beforeAll(): Unit = { webDriver = new HtmlUnitDriver @@ -81,42 +76,28 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before val rdd = sc.parallelize(Seq(1, 2, 3)) rdd.persist(StorageLevels.DISK_ONLY).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(ui, "/storage") + go to (ui.appUIAddress.stripSuffix("/") + "/storage") val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.DISK_ONLY.description) } eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(ui, "/storage/rdd/?id=0") + go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0") val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.DISK_ONLY.description) } - val storageJson = getJson(ui, "storage/rdd") - storageJson.children.length should be (1) - (storageJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description) - val rddJson = getJson(ui, "storage/rdd/0") - (rddJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description) - rdd.unpersist() rdd.persist(StorageLevels.MEMORY_ONLY).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(ui, "/storage") + go to (ui.appUIAddress.stripSuffix("/") + "/storage") val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.MEMORY_ONLY.description) } eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(ui, "/storage/rdd/?id=0") + go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0") val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.MEMORY_ONLY.description) } - - val updatedStorageJson = getJson(ui, "storage/rdd") - updatedStorageJson.children.length should be (1) - (updatedStorageJson \ "storageLevel").extract[String] should be ( - StorageLevels.MEMORY_ONLY.description) - val updatedRddJson = getJson(ui, "storage/rdd/0") - (updatedRddJson \ "storageLevel").extract[String] should be ( - StorageLevels.MEMORY_ONLY.description) } } @@ -127,13 +108,10 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before sc.parallelize(1 to 10).map { x => throw new Exception()}.collect() } eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(sc, "/stages") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") find(id("active")) should be(None) // Since we hide empty tables find(id("failed")).get.text should be("Failed Stages (1)") } - val stageJson = getJson(sc.ui.get, "stages") - stageJson.children.length should be (1) - (stageJson \ "status").extract[String] should be (StageStatus.FAILED.name()) // Regression test for SPARK-2105 class NotSerializable @@ -142,15 +120,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before sc.parallelize(1 to 10).map { x => unserializableObject}.collect() } eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(sc, "/stages") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") find(id("active")) should be(None) // Since we hide empty tables // The failure occurs before the stage becomes active, hence we should still show only one // failed stage, not two: find(id("failed")).get.text should be("Failed Stages (1)") } - - val updatedStageJson = getJson(sc.ui.get, "stages") - updatedStageJson should be (stageJson) } } @@ -163,7 +138,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before withSpark(newSparkContext(killEnabled = true)) { sc => runSlowJob(sc) eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(sc, "/stages") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") assert(hasKillLink) } } @@ -171,7 +146,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before withSpark(newSparkContext(killEnabled = false)) { sc => runSlowJob(sc) eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(sc, "/stages") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") assert(!hasKillLink) } } @@ -182,7 +157,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before // If no job has been run in a job group, then "(Job Group)" should not appear in the header sc.parallelize(Seq(1, 2, 3)).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(sc, "/jobs") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq tableHeaders should not contain "Job Id (Job Group)" } @@ -190,22 +165,10 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before sc.setJobGroup("my-job-group", "my-job-group-description") sc.parallelize(Seq(1, 2, 3)).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(sc, "/jobs") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq tableHeaders should contain ("Job Id (Job Group)") } - - val jobJson = getJson(sc.ui.get, "jobs") - for { - job @ JObject(_) <- jobJson - JInt(jobId) <- job \ "jobId" - jobGroup = job \ "jobGroup" - } { - jobId.toInt match { - case 0 => jobGroup should be (JNothing) - case 1 => jobGroup should be (JString("my-job-group")) - } - } } } @@ -232,7 +195,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } mappedData.count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - goToUi(sc, "/jobs") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)") // Ideally, the following test would pass, but currently we overcount completed tasks // if task recomputations occur: @@ -241,32 +204,6 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before // of completed tasks may be higher: find(cssSelector(".progress-cell .progress")).get.text should be ("3/2 (1 failed)") } - val jobJson = getJson(sc.ui.get, "jobs") - (jobJson \ "numTasks").extract[Int]should be (2) - (jobJson \ "numCompletedTasks").extract[Int] should be (3) - (jobJson \ "numFailedTasks").extract[Int] should be (1) - (jobJson \ "numCompletedStages").extract[Int] should be (2) - (jobJson \ "numFailedStages").extract[Int] should be (1) - val stageJson = getJson(sc.ui.get, "stages") - - for { - stage @ JObject(_) <- stageJson - JString(status) <- stage \ "status" - JInt(stageId) <- stage \ "stageId" - JInt(attemptId) <- stage \ "attemptId" - } { - val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else StageStatus.COMPLETE - status should be (exp.name()) - } - - for { - stageId <- 0 to 1 - attemptId <- 0 to 1 - } { - val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else StageStatus.COMPLETE - val stageJson = getJson(sc.ui.get, s"stages/$stageId/$attemptId") - (stageJson \ "status").extract[String] should be (exp.name()) - } } } @@ -281,7 +218,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before // Start the job: rdd.countAsync() eventually(timeout(10 seconds), interval(50 milliseconds)) { - goToUi(sc, "/jobs/job/?id=0") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=0") find(id("active")).get.text should be ("Active Stages (1)") find(id("pending")).get.text should be ("Pending Stages (2)") // Essentially, we want to check that none of the stage rows show @@ -307,7 +244,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before rdd.count() rdd.count() eventually(timeout(10 seconds), interval(50 milliseconds)) { - goToUi(sc, "/jobs") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") // The completed jobs table should have two rows. The first row will be the most recent job: val firstRow = find(cssSelector("tbody tr")).get.underlying val firstRowColumns = firstRow.findElements(By.tagName("td")) @@ -334,7 +271,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before rdd.count() rdd.count() eventually(timeout(10 seconds), interval(50 milliseconds)) { - goToUi(sc, "/jobs/job/?id=1") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=1") find(id("pending")) should be (None) find(id("active")) should be (None) find(id("failed")) should be (None) @@ -362,7 +299,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before rdd.count() rdd.count() eventually(timeout(10 seconds), interval(50 milliseconds)) { - goToUi(sc, "/jobs") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") findAll(cssSelector("tbody tr a")).foreach { link => link.text.toLowerCase should include ("count") link.text.toLowerCase should not include "unknown" @@ -384,7 +321,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } sparkUI.attachTab(newTab) eventually(timeout(10 seconds), interval(50 milliseconds)) { - goToUi(sc, "") + go to (sc.ui.get.appUIAddress.stripSuffix("/")) find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None) find(cssSelector("""ul li a[href*="stages"]""")) should not be(None) find(cssSelector("""ul li a[href*="storage"]""")) should not be(None) @@ -393,12 +330,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } eventually(timeout(10 seconds), interval(50 milliseconds)) { // check whether new page exists - goToUi(sc, "/foo") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo") find(cssSelector("b")).get.text should include ("html magic") } sparkUI.detachTab(newTab) eventually(timeout(10 seconds), interval(50 milliseconds)) { - goToUi(sc, "") + go to (sc.ui.get.appUIAddress.stripSuffix("/")) find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None) find(cssSelector("""ul li a[href*="stages"]""")) should not be(None) find(cssSelector("""ul li a[href*="storage"]""")) should not be(None) @@ -407,7 +344,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } eventually(timeout(10 seconds), interval(50 milliseconds)) { // check new page not exist - goToUi(sc, "/foo") + go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo") find(cssSelector("b")) should be(None) } } @@ -434,163 +371,4 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } } } - - test("stage & job retention") { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test") - .set("spark.ui.enabled", "true") - .set("spark.ui.port", "0") - .set("spark.ui.retainedStages", "3") - .set("spark.ui.retainedJobs", "2") - val sc = new SparkContext(conf) - assert(sc.ui.isDefined) - - withSpark(sc) { sc => - // run a few jobs & stages ... - (0 until 5).foreach { idx => - // NOTE: if we reverse the order, things don't really behave nicely - // we lose the stage for a job we keep, and then the job doesn't know - // about its last stage - sc.parallelize(idx to (idx + 3)).map(identity).groupBy(identity).map(identity) - .groupBy(identity).count() - sc.parallelize(idx to (idx + 3)).collect() - } - - val expJobInfo = Seq( - ("9", "collect"), - ("8", "count") - ) - - eventually(timeout(1 second), interval(50 milliseconds)) { - goToUi(sc, "/jobs") - // The completed jobs table should have two rows. The first row will be the most recent job: - find("completed-summary").get.text should be ("Completed Jobs: 10, only showing 2") - find("completed").get.text should be ("Completed Jobs (10, only showing 2)") - val rows = findAll(cssSelector("tbody tr")).toIndexedSeq.map{_.underlying} - rows.size should be (expJobInfo.size) - for { - (row, idx) <- rows.zipWithIndex - columns = row.findElements(By.tagName("td")) - id = columns(0).getText() - description = columns(1).getText() - } { - id should be (expJobInfo(idx)._1) - description should include (expJobInfo(idx)._2) - } - } - - val jobsJson = getJson(sc.ui.get, "jobs") - jobsJson.children.size should be (expJobInfo.size) - for { - (job @ JObject(_),idx) <- jobsJson.children.zipWithIndex - id = (job \ "jobId").extract[String] - name = (job \ "name").extract[String] - } { - withClue(s"idx = $idx; id = $id; name = ${name.substring(0,20)}") { - id should be (expJobInfo(idx)._1) - name should include (expJobInfo(idx)._2) - } - } - - // what about when we query for a job that did exist, but has been cleared? - goToUi(sc, "/jobs/job/?id=7") - find("no-info").get.text should be ("No information to display for job 7") - - val badJob = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get, "jobs/7")) - badJob._1 should be (HttpServletResponse.SC_NOT_FOUND) - badJob._2 should be (None) - badJob._3 should be (Some("unknown job: 7")) - - val expStageInfo = Seq( - ("19", "collect"), - ("18", "count"), - ("17", "groupBy") - ) - - eventually(timeout(1 second), interval(50 milliseconds)) { - goToUi(sc, "/stages") - find("completed-summary").get.text should be ("Completed Stages: 20, only showing 3") - find("completed").get.text should be ("Completed Stages (20, only showing 3)") - val rows = findAll(cssSelector("tbody tr")).toIndexedSeq.map{_.underlying} - rows.size should be (3) - for { - (row, idx) <- rows.zipWithIndex - columns = row.findElements(By.tagName("td")) - id = columns(0).getText() - description = columns(1).getText() - } { - id should be (expStageInfo(idx)._1) - description should include (expStageInfo(idx)._2) - } - } - - val stagesJson = getJson(sc.ui.get, "stages") - stagesJson.children.size should be (3) - for { - (stage @ JObject(_), idx) <- stagesJson.children.zipWithIndex - id = (stage \ "stageId").extract[String] - name = (stage \ "name").extract[String] - } { - id should be (expStageInfo(idx)._1) - name should include (expStageInfo(idx)._2) - } - - // nonexistent stage - - goToUi(sc, "/stages/stage/?id=12&attempt=0") - find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)") - val badStage = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/12/0")) - badStage._1 should be (HttpServletResponse.SC_NOT_FOUND) - badStage._2 should be (None) - badStage._3 should be (Some("unknown stage: 12")) - - val badAttempt = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/19/15")) - badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND) - badAttempt._2 should be (None) - badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]")) - - val badStageAttemptList = HistoryServerSuite.getContentAndCode( - jsonUrl(sc.ui.get, "stages/12")) - badStageAttemptList._1 should be (HttpServletResponse.SC_NOT_FOUND) - badStageAttemptList._2 should be (None) - badStageAttemptList._3 should be (Some("unknown stage: 12")) - } - } - - test("live UI json application list") { - withSpark(newSparkContext()) { sc => - val appListRawJson = HistoryServerSuite.getUrl(new URL( - sc.ui.get.appUIAddress + "/json/v1/applications")) - val appListJsonAst = JsonMethods.parse(appListRawJson) - appListJsonAst.children.length should be (1) - val attempts = (appListJsonAst \ "attempts").children - attempts.size should be (1) - (attempts(0) \ "completed").extract[Boolean] should be (false) - parseDate(attempts(0) \ "startTime") should be (sc.startTime) - parseDate(attempts(0) \ "endTime") should be (-1) - val oneAppJsonAst = getJson(sc.ui.get, "") - oneAppJsonAst should be (appListJsonAst.children(0)) - } - } - - def goToUi(sc: SparkContext, path: String): Unit = { - goToUi(sc.ui.get, path) - } - - def goToUi(ui: SparkUI, path: String): Unit = { - go to (ui.appUIAddress.stripSuffix("/") + path) - } - - def parseDate(json: JValue): Long = { - JacksonMessageWriter.makeISODateFormat.parse(json.extract[String]).getTime - } - - def getJson(ui: SparkUI, path: String): JValue = { - JsonMethods.parse(HistoryServerSuite.getUrl(jsonUrl(ui, path))) - } - - def jsonUrl(ui: SparkUI, path: String): URL = { - new URL(ui.appUIAddress + "/json/v1/applications/test/" + path) - } } |