aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/JsonTestUtils.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala223
-rw-r--r--core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala29
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala264
5 files changed, 42 insertions, 522 deletions
diff --git a/core/src/test/scala/org/apache/spark/JsonTestUtils.scala b/core/src/test/scala/org/apache/spark/JsonTestUtils.scala
deleted file mode 100644
index ba367cd476..0000000000
--- a/core/src/test/scala/org/apache/spark/JsonTestUtils.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark
-
-import org.json4s._
-import org.json4s.jackson.JsonMethods
-
-trait JsonTestUtils {
- def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
- val Diff(c, a, d) = validateJson.diff(expectedJson)
- val validatePretty = JsonMethods.pretty(validateJson)
- val expectedPretty = JsonMethods.pretty(expectedJson)
- val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
- import org.scalactic.TripleEquals._
- assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
- assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
- assert(d === JNothing, s"$errorMessage\nDeleted:\n${JsonMethods.pretty(d)}")
- }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index e04a792841..b58d62567a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -28,9 +28,9 @@ import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf}
+import org.apache.spark.{SecurityManager, SparkConf}
-class JsonProtocolSuite extends FunSuite with JsonTestUtils {
+class JsonProtocolSuite extends FunSuite {
test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
@@ -136,6 +136,16 @@ class JsonProtocolSuite extends FunSuite with JsonTestUtils {
case e: JsonParseException => fail("Invalid Json detected", e)
}
}
+
+ def assertValidDataInJson(validateJson: JValue, expectedJson: JValue) {
+ val Diff(c, a, d) = validateJson diff expectedJson
+ val validatePretty = JsonMethods.pretty(validateJson)
+ val expectedPretty = JsonMethods.pretty(expectedJson)
+ val errorMessage = s"Expected:\n$expectedPretty\nFound:\n$validatePretty"
+ assert(c === JNothing, s"$errorMessage\nChanged:\n${JsonMethods.pretty(c)}")
+ assert(a === JNothing, s"$errorMessage\nAdded:\n${JsonMethods.pretty(a)}")
+ assert(d === JNothing, s"$errorMessage\nDelected:\n${JsonMethods.pretty(d)}")
+ }
}
object JsonConstants {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 0744b68c69..71ba9c1825 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -14,161 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.deploy.history
-import java.io.{File, FileInputStream, FileWriter, IOException}
-import java.net.{HttpURLConnection, URL}
-import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.mutable
-import org.apache.commons.io.{FileUtils, IOUtils}
-import org.mockito.Mockito.when
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.apache.hadoop.fs.Path
+import org.mockito.Mockito.{when}
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf}
import org.apache.spark.ui.SparkUI
-/**
- * A collection of tests against the historyserver, including comparing responses from the json
- * metrics api to a set of known "golden files". If new endpoints / parameters are added,
- * cases should be added to this test suite. The expected outcomes can be genered by running
- * the HistoryServerSuite.main. Note that this will blindly generate new expectation files matching
- * the current behavior -- the developer must verify that behavior is correct.
- *
- * Similarly, if the behavior is changed, HistoryServerSuite.main can be run to update the
- * expectations. However, in general this should be done with extreme caution, as the metrics
- * are considered part of Spark's public api.
- */
-class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with MockitoSugar
- with JsonTestUtils {
-
- private val logDir = new File("src/test/resources/spark-events")
- private val expRoot = new File("src/test/resources/HistoryServerExpectations/")
-
- private var provider: FsHistoryProvider = null
- private var server: HistoryServer = null
- private var port: Int = -1
-
- def init(): Unit = {
- val conf = new SparkConf()
- .set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
- .set("spark.history.fs.updateInterval", "0")
- .set("spark.testing", "true")
- provider = new FsHistoryProvider(conf)
- provider.checkForLogs()
- val securityManager = new SecurityManager(conf)
-
- server = new HistoryServer(conf, provider, securityManager, 18080)
- server.initialize()
- server.bind()
- port = server.boundPort
- }
-
- def stop(): Unit = {
- server.stop()
- }
-
- before {
- init()
- }
-
- after{
- stop()
- }
-
- val cases = Seq(
- "application list json" -> "applications",
- "completed app list json" -> "applications?status=completed",
- "running app list json" -> "applications?status=running",
- "minDate app list json" -> "applications?minDate=2015-02-10",
- "maxDate app list json" -> "applications?maxDate=2015-02-10",
- "maxDate2 app list json" -> "applications?maxDate=2015-02-03T10:42:40.000CST",
- "one app json" -> "applications/local-1422981780767",
- "one app multi-attempt json" -> "applications/local-1426533911241",
- "job list json" -> "applications/local-1422981780767/jobs",
- "job list from multi-attempt app json(1)" -> "applications/local-1426533911241/1/jobs",
- "job list from multi-attempt app json(2)" -> "applications/local-1426533911241/2/jobs",
- "one job json" -> "applications/local-1422981780767/jobs/0",
- "succeeded job list json" -> "applications/local-1422981780767/jobs?status=succeeded",
- "succeeded&failed job list json" ->
- "applications/local-1422981780767/jobs?status=succeeded&status=failed",
- "executor list json" -> "applications/local-1422981780767/executors",
- "stage list json" -> "applications/local-1422981780767/stages",
- "complete stage list json" -> "applications/local-1422981780767/stages?status=complete",
- "failed stage list json" -> "applications/local-1422981780767/stages?status=failed",
- "one stage json" -> "applications/local-1422981780767/stages/1",
- "one stage attempt json" -> "applications/local-1422981780767/stages/1/0",
-
- "stage task summary" -> "applications/local-1427397477963/stages/20/0/taskSummary",
- "stage task summary w/ custom quantiles" ->
- "applications/local-1427397477963/stages/20/0/taskSummary?quantiles=0.01,0.5,0.99",
-
- "stage task list" -> "applications/local-1427397477963/stages/20/0/taskList",
- "stage task list w/ offset & length" ->
- "applications/local-1427397477963/stages/20/0/taskList?offset=10&length=50",
- "stage task list w/ sortBy" ->
- "applications/local-1427397477963/stages/20/0/taskList?sortBy=DECREASING_RUNTIME",
- "stage task list w/ sortBy short names: -runtime" ->
- "applications/local-1427397477963/stages/20/0/taskList?sortBy=-runtime",
- "stage task list w/ sortBy short names: runtime" ->
- "applications/local-1427397477963/stages/20/0/taskList?sortBy=runtime",
-
- "stage list with accumulable json" -> "applications/local-1426533911241/1/stages",
- "stage with accumulable json" -> "applications/local-1426533911241/1/stages/0/0",
- "stage task list from multi-attempt app json(1)" ->
- "applications/local-1426533911241/1/stages/0/0/taskList",
- "stage task list from multi-attempt app json(2)" ->
- "applications/local-1426533911241/2/stages/0/0/taskList",
-
- "rdd list storage json" -> "applications/local-1422981780767/storage/rdd",
- "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
- )
-
- // run a bunch of characterization tests -- just verify the behavior is the same as what is saved
- // in the test resource folder
- cases.foreach { case (name, path) =>
- test(name) {
- val (code, jsonOpt, errOpt) = getContentAndCode(path)
- code should be (HttpServletResponse.SC_OK)
- jsonOpt should be ('defined)
- errOpt should be (None)
- val json = jsonOpt.get
- val exp = IOUtils.toString(new FileInputStream(
- new File(expRoot, path + "/json_expectation")))
- // compare the ASTs so formatting differences don't cause failures
- import org.json4s._
- import org.json4s.jackson.JsonMethods._
- val jsonAst = parse(json)
- val expAst = parse(exp)
- assertValidDataInJson(jsonAst, expAst)
- }
- }
-
- test("response codes on bad paths") {
- val badAppId = getContentAndCode("applications/foobar")
- badAppId._1 should be (HttpServletResponse.SC_NOT_FOUND)
- badAppId._3 should be (Some("unknown app: foobar"))
-
- val badStageId = getContentAndCode("applications/local-1422981780767/stages/12345")
- badStageId._1 should be (HttpServletResponse.SC_NOT_FOUND)
- badStageId._3 should be (Some("unknown stage: 12345"))
-
- val badStageAttemptId = getContentAndCode("applications/local-1422981780767/stages/1/1")
- badStageAttemptId._1 should be (HttpServletResponse.SC_NOT_FOUND)
- badStageAttemptId._3 should be (Some("unknown attempt for stage 1. Found attempts: [0]"))
-
- val badStageId2 = getContentAndCode("applications/local-1422981780767/stages/flimflam")
- badStageId2._1 should be (HttpServletResponse.SC_NOT_FOUND)
- // will take some mucking w/ jersey to get a better error msg in this case
-
- val badQuantiles = getContentAndCode(
- "applications/local-1427397477963/stages/20/0/taskSummary?quantiles=foo,0.1")
- badQuantiles._1 should be (HttpServletResponse.SC_BAD_REQUEST)
- badQuantiles._3 should be (Some("Bad value for parameter \"quantiles\". Expected a double, " +
- "got \"foo\""))
-
- getContentAndCode("foobar")._1 should be (HttpServletResponse.SC_NOT_FOUND)
- }
+class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
test("generate history page with relative links") {
val historyServer = mock[HistoryServer]
@@ -193,70 +54,4 @@ class HistoryServerSuite extends FunSuite with BeforeAndAfter with Matchers with
} yield (attrs.toString)
justHrefs should contain(link)
}
-
- def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = {
- HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/json/v1/$path"))
- }
-
- def getUrl(path: String): String = {
- HistoryServerSuite.getUrl(new URL(s"http://localhost:$port/json/v1/$path"))
- }
-
- def generateExpectation(path: String): Unit = {
- val json = getUrl(path)
- val dir = new File(expRoot, path)
- dir.mkdirs()
- val out = new FileWriter(new File(dir, "json_expectation"))
- out.write(json)
- out.close()
- }
-}
-
-object HistoryServerSuite {
- def main(args: Array[String]): Unit = {
- // generate the "expected" results for the characterization tests. Just blindly assume the
- // current behavior is correct, and write out the returned json to the test/resource files
-
- val suite = new HistoryServerSuite
- FileUtils.deleteDirectory(suite.expRoot)
- suite.expRoot.mkdirs()
- try {
- suite.init()
- suite.cases.foreach { case (_, path) =>
- suite.generateExpectation(path)
- }
- } finally {
- suite.stop()
- }
- }
-
- def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = {
- val connection = url.openConnection().asInstanceOf[HttpURLConnection]
- connection.setRequestMethod("GET")
- connection.connect()
- val code = connection.getResponseCode()
- val inString = try {
- val in = Option(connection.getInputStream())
- in.map{IOUtils.toString}
- } catch {
- case io: IOException => None
- }
- val errString = try {
- val err = Option(connection.getErrorStream())
- err.map{IOUtils.toString}
- } catch {
- case io: IOException => None
- }
- (code, inString, errString)
- }
-
- def getUrl(path: URL): String = {
- val (code, resultOpt, error) = getContentAndCode(path)
- if (code == 200) {
- resultOpt.get
- } else {
- throw new RuntimeException(
- "got code: " + code + " when getting " + path + " w/ error: " + error)
- }
- }
}
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala b/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala
deleted file mode 100644
index 5274df904d..0000000000
--- a/core/src/test/scala/org/apache/spark/status/api/v1/SimpleDateParamTest.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.status.api.v1
-
-import org.scalatest.{Matchers, FunSuite}
-
-class SimpleDateParamTest extends FunSuite with Matchers {
-
- test("date parsing") {
- new SimpleDateParam("2015-02-20T23:21:17.190GMT").timestamp should be (1424474477190L)
- new SimpleDateParam("2015-02-20T17:21:17.190CST").timestamp should be (1424474477190L)
- new SimpleDateParam("2015-02-20").timestamp should be (1424390400000L) // GMT
- }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 117b2c3960..d53d7f3ba5 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -18,13 +18,11 @@
package org.apache.spark.ui
import java.net.{HttpURLConnection, URL}
-import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
+import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConversions._
import scala.xml.Node
-import org.json4s._
-import org.json4s.jackson.JsonMethods
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.openqa.selenium.{By, WebDriver}
import org.scalatest._
@@ -35,9 +33,8 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.LocalSparkContext._
import org.apache.spark._
import org.apache.spark.api.java.StorageLevels
-import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus}
+
/**
* Selenium tests for the Spark Web UI.
@@ -45,8 +42,6 @@ import org.apache.spark.status.api.v1.{JacksonMessageWriter, StageStatus}
class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll {
implicit var webDriver: WebDriver = _
- implicit val formats = DefaultFormats
-
override def beforeAll(): Unit = {
webDriver = new HtmlUnitDriver
@@ -81,42 +76,28 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
val rdd = sc.parallelize(Seq(1, 2, 3))
rdd.persist(StorageLevels.DISK_ONLY).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(ui, "/storage")
+ go to (ui.appUIAddress.stripSuffix("/") + "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(ui, "/storage/rdd/?id=0")
+ go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
}
- val storageJson = getJson(ui, "storage/rdd")
- storageJson.children.length should be (1)
- (storageJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description)
- val rddJson = getJson(ui, "storage/rdd/0")
- (rddJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description)
-
rdd.unpersist()
rdd.persist(StorageLevels.MEMORY_ONLY).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(ui, "/storage")
+ go to (ui.appUIAddress.stripSuffix("/") + "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(ui, "/storage/rdd/?id=0")
+ go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
}
-
- val updatedStorageJson = getJson(ui, "storage/rdd")
- updatedStorageJson.children.length should be (1)
- (updatedStorageJson \ "storageLevel").extract[String] should be (
- StorageLevels.MEMORY_ONLY.description)
- val updatedRddJson = getJson(ui, "storage/rdd/0")
- (updatedRddJson \ "storageLevel").extract[String] should be (
- StorageLevels.MEMORY_ONLY.description)
}
}
@@ -127,13 +108,10 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
sc.parallelize(1 to 10).map { x => throw new Exception()}.collect()
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/stages")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
find(id("active")) should be(None) // Since we hide empty tables
find(id("failed")).get.text should be("Failed Stages (1)")
}
- val stageJson = getJson(sc.ui.get, "stages")
- stageJson.children.length should be (1)
- (stageJson \ "status").extract[String] should be (StageStatus.FAILED.name())
// Regression test for SPARK-2105
class NotSerializable
@@ -142,15 +120,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
sc.parallelize(1 to 10).map { x => unserializableObject}.collect()
}
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/stages")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
find(id("active")) should be(None) // Since we hide empty tables
// The failure occurs before the stage becomes active, hence we should still show only one
// failed stage, not two:
find(id("failed")).get.text should be("Failed Stages (1)")
}
-
- val updatedStageJson = getJson(sc.ui.get, "stages")
- updatedStageJson should be (stageJson)
}
}
@@ -163,7 +138,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/stages")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
assert(hasKillLink)
}
}
@@ -171,7 +146,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
withSpark(newSparkContext(killEnabled = false)) { sc =>
runSlowJob(sc)
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/stages")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
assert(!hasKillLink)
}
}
@@ -182,7 +157,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
// If no job has been run in a job group, then "(Job Group)" should not appear in the header
sc.parallelize(Seq(1, 2, 3)).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/jobs")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
tableHeaders should not contain "Job Id (Job Group)"
}
@@ -190,22 +165,10 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
sc.setJobGroup("my-job-group", "my-job-group-description")
sc.parallelize(Seq(1, 2, 3)).count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/jobs")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
tableHeaders should contain ("Job Id (Job Group)")
}
-
- val jobJson = getJson(sc.ui.get, "jobs")
- for {
- job @ JObject(_) <- jobJson
- JInt(jobId) <- job \ "jobId"
- jobGroup = job \ "jobGroup"
- } {
- jobId.toInt match {
- case 0 => jobGroup should be (JNothing)
- case 1 => jobGroup should be (JString("my-job-group"))
- }
- }
}
}
@@ -232,7 +195,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
mappedData.count()
eventually(timeout(5 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/jobs")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)")
// Ideally, the following test would pass, but currently we overcount completed tasks
// if task recomputations occur:
@@ -241,32 +204,6 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
// of completed tasks may be higher:
find(cssSelector(".progress-cell .progress")).get.text should be ("3/2 (1 failed)")
}
- val jobJson = getJson(sc.ui.get, "jobs")
- (jobJson \ "numTasks").extract[Int]should be (2)
- (jobJson \ "numCompletedTasks").extract[Int] should be (3)
- (jobJson \ "numFailedTasks").extract[Int] should be (1)
- (jobJson \ "numCompletedStages").extract[Int] should be (2)
- (jobJson \ "numFailedStages").extract[Int] should be (1)
- val stageJson = getJson(sc.ui.get, "stages")
-
- for {
- stage @ JObject(_) <- stageJson
- JString(status) <- stage \ "status"
- JInt(stageId) <- stage \ "stageId"
- JInt(attemptId) <- stage \ "attemptId"
- } {
- val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else StageStatus.COMPLETE
- status should be (exp.name())
- }
-
- for {
- stageId <- 0 to 1
- attemptId <- 0 to 1
- } {
- val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else StageStatus.COMPLETE
- val stageJson = getJson(sc.ui.get, s"stages/$stageId/$attemptId")
- (stageJson \ "status").extract[String] should be (exp.name())
- }
}
}
@@ -281,7 +218,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
// Start the job:
rdd.countAsync()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/jobs/job/?id=0")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=0")
find(id("active")).get.text should be ("Active Stages (1)")
find(id("pending")).get.text should be ("Pending Stages (2)")
// Essentially, we want to check that none of the stage rows show
@@ -307,7 +244,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
rdd.count()
rdd.count()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/jobs")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
// The completed jobs table should have two rows. The first row will be the most recent job:
val firstRow = find(cssSelector("tbody tr")).get.underlying
val firstRowColumns = firstRow.findElements(By.tagName("td"))
@@ -334,7 +271,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
rdd.count()
rdd.count()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/jobs/job/?id=1")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=1")
find(id("pending")) should be (None)
find(id("active")) should be (None)
find(id("failed")) should be (None)
@@ -362,7 +299,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
rdd.count()
rdd.count()
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- goToUi(sc, "/jobs")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
findAll(cssSelector("tbody tr a")).foreach { link =>
link.text.toLowerCase should include ("count")
link.text.toLowerCase should not include "unknown"
@@ -384,7 +321,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
sparkUI.attachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- goToUi(sc, "")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/"))
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
@@ -393,12 +330,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
// check whether new page exists
- goToUi(sc, "/foo")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
find(cssSelector("b")).get.text should include ("html magic")
}
sparkUI.detachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
- goToUi(sc, "")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/"))
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
@@ -407,7 +344,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
eventually(timeout(10 seconds), interval(50 milliseconds)) {
// check new page not exist
- goToUi(sc, "/foo")
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
find(cssSelector("b")) should be(None)
}
}
@@ -434,163 +371,4 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
}
}
-
- test("stage & job retention") {
- val conf = new SparkConf()
- .setMaster("local")
- .setAppName("test")
- .set("spark.ui.enabled", "true")
- .set("spark.ui.port", "0")
- .set("spark.ui.retainedStages", "3")
- .set("spark.ui.retainedJobs", "2")
- val sc = new SparkContext(conf)
- assert(sc.ui.isDefined)
-
- withSpark(sc) { sc =>
- // run a few jobs & stages ...
- (0 until 5).foreach { idx =>
- // NOTE: if we reverse the order, things don't really behave nicely
- // we lose the stage for a job we keep, and then the job doesn't know
- // about its last stage
- sc.parallelize(idx to (idx + 3)).map(identity).groupBy(identity).map(identity)
- .groupBy(identity).count()
- sc.parallelize(idx to (idx + 3)).collect()
- }
-
- val expJobInfo = Seq(
- ("9", "collect"),
- ("8", "count")
- )
-
- eventually(timeout(1 second), interval(50 milliseconds)) {
- goToUi(sc, "/jobs")
- // The completed jobs table should have two rows. The first row will be the most recent job:
- find("completed-summary").get.text should be ("Completed Jobs: 10, only showing 2")
- find("completed").get.text should be ("Completed Jobs (10, only showing 2)")
- val rows = findAll(cssSelector("tbody tr")).toIndexedSeq.map{_.underlying}
- rows.size should be (expJobInfo.size)
- for {
- (row, idx) <- rows.zipWithIndex
- columns = row.findElements(By.tagName("td"))
- id = columns(0).getText()
- description = columns(1).getText()
- } {
- id should be (expJobInfo(idx)._1)
- description should include (expJobInfo(idx)._2)
- }
- }
-
- val jobsJson = getJson(sc.ui.get, "jobs")
- jobsJson.children.size should be (expJobInfo.size)
- for {
- (job @ JObject(_),idx) <- jobsJson.children.zipWithIndex
- id = (job \ "jobId").extract[String]
- name = (job \ "name").extract[String]
- } {
- withClue(s"idx = $idx; id = $id; name = ${name.substring(0,20)}") {
- id should be (expJobInfo(idx)._1)
- name should include (expJobInfo(idx)._2)
- }
- }
-
- // what about when we query for a job that did exist, but has been cleared?
- goToUi(sc, "/jobs/job/?id=7")
- find("no-info").get.text should be ("No information to display for job 7")
-
- val badJob = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get, "jobs/7"))
- badJob._1 should be (HttpServletResponse.SC_NOT_FOUND)
- badJob._2 should be (None)
- badJob._3 should be (Some("unknown job: 7"))
-
- val expStageInfo = Seq(
- ("19", "collect"),
- ("18", "count"),
- ("17", "groupBy")
- )
-
- eventually(timeout(1 second), interval(50 milliseconds)) {
- goToUi(sc, "/stages")
- find("completed-summary").get.text should be ("Completed Stages: 20, only showing 3")
- find("completed").get.text should be ("Completed Stages (20, only showing 3)")
- val rows = findAll(cssSelector("tbody tr")).toIndexedSeq.map{_.underlying}
- rows.size should be (3)
- for {
- (row, idx) <- rows.zipWithIndex
- columns = row.findElements(By.tagName("td"))
- id = columns(0).getText()
- description = columns(1).getText()
- } {
- id should be (expStageInfo(idx)._1)
- description should include (expStageInfo(idx)._2)
- }
- }
-
- val stagesJson = getJson(sc.ui.get, "stages")
- stagesJson.children.size should be (3)
- for {
- (stage @ JObject(_), idx) <- stagesJson.children.zipWithIndex
- id = (stage \ "stageId").extract[String]
- name = (stage \ "name").extract[String]
- } {
- id should be (expStageInfo(idx)._1)
- name should include (expStageInfo(idx)._2)
- }
-
- // nonexistent stage
-
- goToUi(sc, "/stages/stage/?id=12&attempt=0")
- find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)")
- val badStage = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/12/0"))
- badStage._1 should be (HttpServletResponse.SC_NOT_FOUND)
- badStage._2 should be (None)
- badStage._3 should be (Some("unknown stage: 12"))
-
- val badAttempt = HistoryServerSuite.getContentAndCode(jsonUrl(sc.ui.get,"stages/19/15"))
- badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND)
- badAttempt._2 should be (None)
- badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]"))
-
- val badStageAttemptList = HistoryServerSuite.getContentAndCode(
- jsonUrl(sc.ui.get, "stages/12"))
- badStageAttemptList._1 should be (HttpServletResponse.SC_NOT_FOUND)
- badStageAttemptList._2 should be (None)
- badStageAttemptList._3 should be (Some("unknown stage: 12"))
- }
- }
-
- test("live UI json application list") {
- withSpark(newSparkContext()) { sc =>
- val appListRawJson = HistoryServerSuite.getUrl(new URL(
- sc.ui.get.appUIAddress + "/json/v1/applications"))
- val appListJsonAst = JsonMethods.parse(appListRawJson)
- appListJsonAst.children.length should be (1)
- val attempts = (appListJsonAst \ "attempts").children
- attempts.size should be (1)
- (attempts(0) \ "completed").extract[Boolean] should be (false)
- parseDate(attempts(0) \ "startTime") should be (sc.startTime)
- parseDate(attempts(0) \ "endTime") should be (-1)
- val oneAppJsonAst = getJson(sc.ui.get, "")
- oneAppJsonAst should be (appListJsonAst.children(0))
- }
- }
-
- def goToUi(sc: SparkContext, path: String): Unit = {
- goToUi(sc.ui.get, path)
- }
-
- def goToUi(ui: SparkUI, path: String): Unit = {
- go to (ui.appUIAddress.stripSuffix("/") + path)
- }
-
- def parseDate(json: JValue): Long = {
- JacksonMessageWriter.makeISODateFormat.parse(json.extract[String]).getTime
- }
-
- def getJson(ui: SparkUI, path: String): JValue = {
- JsonMethods.parse(HistoryServerSuite.getUrl(jsonUrl(ui, path)))
- }
-
- def jsonUrl(ui: SparkUI, path: String): URL = {
- new URL(ui.appUIAddress + "/json/v1/applications/test/" + path)
- }
}