aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2015-05-22 16:05:07 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-05-22 16:05:07 -0700
commit821254fb945c3e19540eb57fff1f656737ef484b (patch)
tree33f17f1bcfabf6af646b656fcaf32e8fd0a1b502
parent126d7235de649ea5619dee6ad3a70970ee90df93 (diff)
downloadspark-821254fb945c3e19540eb57fff1f656737ef484b.tar.gz
spark-821254fb945c3e19540eb57fff1f656737ef484b.tar.bz2
spark-821254fb945c3e19540eb57fff1f656737ef484b.zip
[SPARK-7760] add /json back into master & worker pages; add test
Author: Imran Rashid <irashid@cloudera.com> Closes #6284 from squito/SPARK-7760 and squashes the following commits: 5e02d8a [Imran Rashid] style; increase timeout 9987399 [Imran Rashid] comment 8c7ed63 [Imran Rashid] add /json back into master & worker pages; add test
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala31
3 files changed, 37 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 860e1a2490..0550f00a17 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -43,6 +43,8 @@ class LocalSparkCluster(
private val localHostname = Utils.localHostName()
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
+ // exposed for testing
+ var masterWebUIPort = -1
def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
@@ -53,7 +55,9 @@ class LocalSparkCluster(
.set("spark.shuffle.service.enabled", "false")
/* Start the Master */
- val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
+ val (masterSystem, masterPort, webUiPort, _) =
+ Master.startSystemAndActor(localHostname, 0, 0, _conf)
+ masterWebUIPort = webUiPort
masterActorSystems += masterSystem
val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 1df9cd0fa1..594df15e9c 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -77,7 +77,10 @@ private[spark] abstract class WebUI(
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, basePath)
+ val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
+ (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
attachHandler(renderHandler)
+ attachHandler(renderJsonHandler)
pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
.append(renderHandler)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 0faa8f650e..f97e5ff6db 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -21,16 +21,20 @@ import java.util.Date
import scala.concurrent.Await
import scala.concurrent.duration._
+import scala.io.Source
import scala.language.postfixOps
import akka.actor.Address
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
import org.scalatest.{FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
-import org.apache.spark.deploy._
import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy._
-class MasterSuite extends FunSuite with Matchers {
+class MasterSuite extends FunSuite with Matchers with Eventually {
test("toAkkaUrl") {
val conf = new SparkConf(loadDefaults = false)
@@ -157,4 +161,27 @@ class MasterSuite extends FunSuite with Matchers {
CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
}
+ test("Master & worker web ui available") {
+ implicit val formats = org.json4s.DefaultFormats
+ val conf = new SparkConf()
+ val localCluster = new LocalSparkCluster(2, 2, 512, conf)
+ localCluster.start()
+ try {
+ eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
+ .getLines().mkString("\n")
+ val JArray(workers) = (parse(json) \ "workers")
+ workers.size should be (2)
+ workers.foreach { workerSummaryJson =>
+ val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
+ val workerResponse = parse(Source.fromURL(s"${workerWebUi}/json")
+ .getLines().mkString("\n"))
+ (workerResponse \ "cores").extract[Int] should be (2)
+ }
+ }
+ } finally {
+ localCluster.stop()
+ }
+ }
+
}