aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/metrics.properties.template13
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala12
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala5
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala5
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala6
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala2
-rw-r--r--core/src/main/scala/spark/metrics/sink/MetricsServlet.scala10
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala6
-rw-r--r--core/src/test/scala/spark/metrics/MetricsConfigSuite.scala17
12 files changed, 39 insertions, 52 deletions
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 07fd046539..6c36f3cca4 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -41,17 +41,12 @@
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
# and it will be loaded automatically.
# 5. MetricsServlet is added by default as a sink in master, worker and client
-# driver, you can send http request "/metrics" to get a snapshot of all the
-# registered metrics in json format. For master, requests "/metrics/master" and
-# "/metrics/applications" can be sent seperately to get metrics snapshot of
-# instance master and applications.
+# driver, you can send http request "/metrics/json" to get a snapshot of all the
+# registered metrics in json format. For master, requests "/metrics/master/json" and
+# "/metrics/applications/json" can be sent seperately to get metrics snapshot of
+# instance master and applications. MetricsServlet may not be configured by self.
#
-# Change MetricsServlet's property
-#*.sink.servlet.uri=/metrics
-#
-#*.sink.servlet.sample=false
-
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index f4a74830c6..152cb2887a 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -57,22 +57,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
var firstApp: Option[ApplicationInfo] = None
- val webUi = new MasterWebUI(self, webUiPort)
-
Utils.checkHost(host, "Expected hostname")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
- // Add default MetricsServlet handler to web ui
- masterMetricsSystem.metricsServlet foreach { m =>
- webUi.handlers = m.getHandlers ++ webUi.handlers
- }
-
- applicationMetricsSystem.metricsServlet foreach { m =>
- webUi.handlers = m.getHandlers ++ webUi.handlers
- }
+ val webUi = new MasterWebUI(this, webUiPort)
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 36a1e91b24..405a1ec3a6 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -33,7 +33,7 @@ import spark.deploy.master.ExecutorInfo
import spark.ui.UIUtils
private[spark] class ApplicationPage(parent: MasterWebUI) {
- val master = parent.master
+ val master = parent.masterActorRef
implicit val timeout = parent.timeout
/** Executor details for a particular application */
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index d3b10f197b..4443d88056 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -35,7 +35,7 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo}
import spark.ui.UIUtils
private[spark] class IndexPage(parent: MasterWebUI) {
- val master = parent.master
+ val master = parent.masterActorRef
implicit val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index d9503663f4..f0a6ffe047 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,7 +17,6 @@
package spark.deploy.master.ui
-import akka.actor.ActorRef
import akka.util.Duration
import javax.servlet.http.HttpServletRequest
@@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, Utils}
+import spark.deploy.master.Master
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
@@ -32,12 +32,14 @@ import spark.ui.JettyUtils._
* Web UI server for the standalone master.
*/
private[spark]
-class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
+class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName()
val port = requestedPort
+ val masterActorRef = master.self
+
var server: Option[Server] = None
var boundPort: Option[Int] = None
@@ -57,7 +59,11 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
}
}
- var handlers = Array[(String, Handler)](
+ val metricsHandlers = master.masterMetricsSystem.metricsServlet.map(_.getHandlers)
+ .getOrElse(Array()) ++ master.applicationMetricsSystem.metricsServlet.map(_.getHandlers)
+ .getOrElse(Array())
+
+ val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 92f8cbc610..0b5013b864 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -102,11 +102,6 @@ private[spark] class Worker(
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
- // Add default MetricsServlet handlers to webUi
- metricsSystem.metricsServlet foreach { m =>
- webUi.handlers = m.getHandlers ++ webUi.handlers
- }
-
webUi.start()
connectToMaster()
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index d345cbecac..b408c63a02 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -17,7 +17,6 @@
package spark.deploy.worker.ui
-import akka.actor.ActorRef
import akka.util.{Duration, Timeout}
import java.io.{FileInputStream, File}
@@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val indexPage = new IndexPage(this)
- var handlers = Array[(String, Handler)](
+ val metricsHandlers = worker.metricsSystem.metricsServlet.map(_.getHandlers).getOrElse(Array())
+
+ val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
("/logPage", (request: HttpServletRequest) => logPage(request)),
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index d10dc45395..d7fb5378a4 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -37,8 +37,10 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
- prop.setProperty("master.sink.servlet.uri", "/metrics/master")
- prop.setProperty("applications.sink.servlet.uri", "/metrics/applications")
+ prop.setProperty("*.sink.servlet.uri", "/metrics/json")
+ prop.setProperty("*.sink.servlet.sample", "false")
+ prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
+ prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
}
def initialize() {
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index ae1f853691..04c750b17e 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -129,7 +129,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
- if (kv._1 =="servlet") {
+ if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
} else {
sinks += sink.asInstanceOf[Sink]
diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
index 39ede9b2df..17432b1ed1 100644
--- a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
@@ -34,15 +34,9 @@ class MetricsServlet(val property: Properties, val registry: MetricRegistry) ext
val SERVLET_KEY_URI = "uri"
val SERVLET_KEY_SAMPLE = "sample"
- val SERVLET_DEFAULT_URI = "/metrics"
- val SERVLET_DEFAULT_SAMPLE = false
+ val servletURI = property.getProperty(SERVLET_KEY_URI)
- val servletURI = property.getProperty(SERVLET_KEY_URI, SERVLET_DEFAULT_URI)
-
- val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)) match {
- case Some(s) => s.toBoolean
- case None => SERVLET_DEFAULT_SAMPLE
- }
+ val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 7e8a41c72e..4bcfdeb62b 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -45,10 +45,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val exec = new ExecutorsUI(sc)
// Add MetricsServlet handlers by default
- val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet match {
- case Some(s) => s.getHandlers
- case None => Array[(String, Handler)]()
- }
+ val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet.map(_.getHandlers)
+ .getOrElse(Array())
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
exec.getHandlers ++ metricsServletHandlers ++ handlers
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index df999cd532..b0213b62d9 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -30,12 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
- assert(conf.properties.size() === 3)
+ assert(conf.properties.size() === 5)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
- assert(property.size() === 1)
+ assert(property.size() === 3)
assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(property.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with properties set") {
@@ -43,19 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 5)
+ assert(masterProp.size() === 6)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
- assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master")
+ assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
+ assert(masterProp.getProperty("sink.servlet.sample") === "false")
val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 4)
+ assert(workerProp.size() === 6)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(workerProp.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with subProperties") {
@@ -79,6 +84,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(consoleProps.size() === 2)
val servletProps = sinkProps("servlet")
- assert(servletProps.size() === 2)
+ assert(servletProps.size() === 3)
}
}