From 320e87e7ab009b851ab035253c04ad56a7bb5955 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 6 Aug 2013 16:19:37 +0800 Subject: Add MetricsServlet for Spark metrics system --- conf/metrics.properties.template | 23 +++++--- core/pom.xml | 4 ++ .../main/scala/spark/deploy/master/Master.scala | 9 ++++ .../scala/spark/deploy/master/ui/MasterWebUI.scala | 2 +- .../main/scala/spark/deploy/worker/Worker.scala | 6 +++ .../scala/spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- .../main/scala/spark/metrics/MetricsConfig.scala | 4 +- .../main/scala/spark/metrics/MetricsSystem.scala | 17 ++++-- .../scala/spark/metrics/sink/MetricsServlet.scala | 61 ++++++++++++++++++++++ core/src/main/scala/spark/ui/JettyUtils.scala | 2 +- core/src/main/scala/spark/ui/SparkUI.scala | 11 +++- .../scala/spark/metrics/MetricsConfigSuite.scala | 44 +++++++++++----- .../scala/spark/metrics/MetricsSystemSuite.scala | 24 +++++++-- pom.xml | 5 ++ project/SparkBuild.scala | 1 + 15 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 core/src/main/scala/spark/metrics/sink/MetricsServlet.scala diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 63a5a2093e..07fd046539 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -3,8 +3,8 @@ # This file configures Spark's internal metrics system. The metrics system is # divided into instances which correspond to internal components. # Each instance can be configured to report its metrics to one or more sinks. -# Accepted values for [instance] are "master", "worker", "executor", "driver", -# and "applications". A wild card "*" can be used as an instance name, in +# Accepted values for [instance] are "master", "worker", "executor", "driver", +# and "applications". A wild card "*" can be used as an instance name, in # which case all instances will inherit the supplied property. # # Within an instance, a "source" specifies a particular set of grouped metrics. @@ -19,7 +19,7 @@ # A "sink" specifies where metrics are delivered to. Each instance can be # assigned one or more sinks. # -# The sink|source field specifies whether the property relates to a sink or +# The sink|source field specifies whether the property relates to a sink or # source. # # The [name] field specifies the name of source or sink. @@ -28,18 +28,29 @@ # source or sink is responsible for parsing this property. # # Notes: -# 1. To add a new sink, set the "class" option to a fully qualified class +# 1. To add a new sink, set the "class" option to a fully qualified class # name (see examples below). # 2. Some sinks involve a polling period. The minimum allowed polling period # is 1 second. -# 3. Wild card properties can be overridden by more specific properties. -# For example, master.sink.console.period takes precedence over +# 3. Wild card properties can be overridden by more specific properties. +# For example, master.sink.console.period takes precedence over # *.sink.console.period. # 4. A metrics specific configuration # "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be # added to Java properties using -Dspark.metrics.conf=xxx if you want to # customize metrics system. You can also put the file in ${SPARK_HOME}/conf # and it will be loaded automatically. +# 5. MetricsServlet is added by default as a sink in master, worker and client +# driver, you can send http request "/metrics" to get a snapshot of all the +# registered metrics in json format. For master, requests "/metrics/master" and +# "/metrics/applications" can be sent seperately to get metrics snapshot of +# instance master and applications. +# + +# Change MetricsServlet's property +#*.sink.servlet.uri=/metrics +# +#*.sink.servlet.sample=false # Enable JmxSink for all instances by class name #*.sink.jmx.class=spark.metrics.sink.JmxSink diff --git a/core/pom.xml b/core/pom.xml index 485aa29f83..dfadd22d42 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -126,6 +126,10 @@ com.codahale.metrics metrics-jvm + + com.codahale.metrics + metrics-json + org.apache.derby diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 4a4d9908a0..f4a74830c6 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -65,6 +65,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications") val masterSource = new MasterSource(this) + // Add default MetricsServlet handler to web ui + masterMetricsSystem.metricsServlet foreach { m => + webUi.handlers = m.getHandlers ++ webUi.handlers + } + + applicationMetricsSystem.metricsServlet foreach { m => + webUi.handlers = m.getHandlers ++ webUi.handlers + } + val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala index 31bdb7854e..d9503663f4 100644 --- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala @@ -57,7 +57,7 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging { } } - val handlers = Array[(String, Handler)]( + var handlers = Array[(String, Handler)]( ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)), ("/app", (request: HttpServletRequest) => applicationPage.render(request)), diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 0e46fa281e..92f8cbc610 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -101,6 +101,12 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) + + // Add default MetricsServlet handlers to webUi + metricsSystem.metricsServlet foreach { m => + webUi.handlers = m.getHandlers ++ webUi.handlers + } + webUi.start() connectToMaster() diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index 742e0a5fb6..d345cbecac 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -49,7 +49,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val indexPage = new IndexPage(this) - val handlers = Array[(String, Handler)]( + var handlers = Array[(String, Handler)]( ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), ("/log", (request: HttpServletRequest) => log(request)), ("/logPage", (request: HttpServletRequest) => logPage(request)), diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala index 3e32e9c82f..d10dc45395 100644 --- a/core/src/main/scala/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala @@ -36,7 +36,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi var propertyCategories: mutable.HashMap[String, Properties] = null private def setDefaultProperties(prop: Properties) { - // empty function, any default property can be set here + prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet") + prop.setProperty("master.sink.servlet.uri", "/metrics/master") + prop.setProperty("applications.sink.servlet.uri", "/metrics/applications") } def initialize() { diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala index 1dacafa135..ae1f853691 100644 --- a/core/src/main/scala/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import spark.Logging -import spark.metrics.sink.Sink +import spark.metrics.sink.{MetricsServlet, Sink} import spark.metrics.source.Source /** @@ -35,7 +35,7 @@ import spark.metrics.source.Source * "instance" specify "who" (the role) use metrics system. In spark there are several roles * like master, worker, executor, client driver, these roles will create metrics system * for monitoring. So instance represents these roles. Currently in Spark, several instances - * have already implemented: master, worker, executor, driver. + * have already implemented: master, worker, executor, driver, applications. * * "source" specify "where" (source) to collect metrics data. In metrics system, there exists * two kinds of source: @@ -51,8 +51,8 @@ import spark.metrics.source.Source * Metrics configuration format is like below: * [instance].[sink|source].[name].[options] = xxxx * - * [instance] can be "master", "worker", "executor", "driver", which means only the specified - * instance has this property. + * [instance] can be "master", "worker", "executor", "driver", "applications" which means only + * the specified instance has this property. * wild card "*" can be used to replace instance name, which means all the instances will have * this property. * @@ -72,6 +72,9 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin val sources = new mutable.ArrayBuffer[Source] val registry = new MetricRegistry() + // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui + var metricsServlet: Option[MetricsServlet] = None + metricsConfig.initialize() registerSources() registerSinks() @@ -126,7 +129,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin val sink = Class.forName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry]) .newInstance(kv._2, registry) - sinks += sink.asInstanceOf[Sink] + if (kv._1 =="servlet") { + metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { + sinks += sink.asInstanceOf[Sink] + } } catch { case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e) } diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala new file mode 100644 index 0000000000..39ede9b2df --- /dev/null +++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.metrics.sink + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.json.MetricsModule + +import com.fasterxml.jackson.databind.ObjectMapper + +import java.util.Properties +import java.util.concurrent.TimeUnit +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import spark.ui.JettyUtils + +class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink { + val SERVLET_KEY_URI = "uri" + val SERVLET_KEY_SAMPLE = "sample" + + val SERVLET_DEFAULT_URI = "/metrics" + val SERVLET_DEFAULT_SAMPLE = false + + val servletURI = property.getProperty(SERVLET_KEY_URI, SERVLET_DEFAULT_URI) + + val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)) match { + case Some(s) => s.toBoolean + case None => SERVLET_DEFAULT_SAMPLE + } + + val mapper = new ObjectMapper().registerModule( + new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) + + def getHandlers = Array[(String, Handler)]( + (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) + ) + + def getMetricsSnapshot(request: HttpServletRequest): String = { + mapper.writeValueAsString(registry) + } + + override def start() { } + + override def stop() { } +} diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala index ca6088ad93..1cc85124d3 100644 --- a/core/src/main/scala/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/spark/ui/JettyUtils.scala @@ -48,7 +48,7 @@ private[spark] object JettyUtils extends Logging { implicit def textResponderToHandler(responder: Responder[String]): Handler = createHandler(responder, "text/plain") - private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, + def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, extractFn: T => String = (in: Any) => in.toString): Handler = { new AbstractHandler { def handle(target: String, diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index 7599f82a94..7e8a41c72e 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.{Handler, Server} -import spark.{Logging, SparkContext, Utils} +import spark.{Logging, SparkContext, SparkEnv, Utils} import spark.ui.env.EnvironmentUI import spark.ui.exec.ExecutorsUI import spark.ui.storage.BlockManagerUI @@ -43,8 +43,15 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { val jobs = new JobProgressUI(sc) val env = new EnvironmentUI(sc) val exec = new ExecutorsUI(sc) + + // Add MetricsServlet handlers by default + val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet match { + case Some(s) => s.getHandlers + case None => Array[(String, Handler)]() + } + val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ - exec.getHandlers ++ handlers + exec.getHandlers ++ metricsServletHandlers ++ handlers /** Bind the HTTP server which backs this web interface */ def bind() { diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala index 87cd2ffad2..df999cd532 100644 --- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala @@ -1,12 +1,24 @@ -package spark.metrics +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import java.util.Properties -import java.io.{File, FileOutputStream} +package spark.metrics import org.scalatest.{BeforeAndAfter, FunSuite} -import spark.metrics._ - class MetricsConfigSuite extends FunSuite with BeforeAndAfter { var filePath: String = _ @@ -18,11 +30,12 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { val conf = new MetricsConfig(Option("dummy-file")) conf.initialize() - assert(conf.properties.size() === 0) + assert(conf.properties.size() === 3) assert(conf.properties.getProperty("test-for-dummy") === null) val property = conf.getInstance("random") - assert(property.size() === 0) + assert(property.size() === 1) + assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") } test("MetricsConfig with properties set") { @@ -30,16 +43,19 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val masterProp = conf.getInstance("master") - assert(masterProp.size() === 3) + assert(masterProp.size() === 5) assert(masterProp.getProperty("sink.console.period") === "20") assert(masterProp.getProperty("sink.console.unit") === "minutes") assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") + assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") + assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master") val workerProp = conf.getInstance("worker") - assert(workerProp.size() === 3) + assert(workerProp.size() === 4) assert(workerProp.getProperty("sink.console.period") === "10") assert(workerProp.getProperty("sink.console.unit") === "seconds") - assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") + assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") + assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") } test("MetricsConfig with subProperties") { @@ -47,7 +63,7 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val propCategories = conf.propertyCategories - assert(propCategories.size === 2) + assert(propCategories.size === 3) val masterProp = conf.getInstance("master") val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX) @@ -55,10 +71,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource") val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX) - assert(sinkProps.size === 1) + assert(sinkProps.size === 2) assert(sinkProps.contains("console")) + assert(sinkProps.contains("servlet")) val consoleProps = sinkProps("console") assert(consoleProps.size() === 2) + + val servletProps = sinkProps("servlet") + assert(servletProps.size() === 2) } } diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala index c189996417..35c2ae41e9 100644 --- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala @@ -1,12 +1,24 @@ -package spark.metrics +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import java.util.Properties -import java.io.{File, FileOutputStream} +package spark.metrics import org.scalatest.{BeforeAndAfter, FunSuite} -import spark.metrics._ - class MetricsSystemSuite extends FunSuite with BeforeAndAfter { var filePath: String = _ @@ -22,6 +34,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter { assert(sources.length === 0) assert(sinks.length === 0) + assert(metricsSystem.metricsServlet != None) } test("MetricsSystem with sources add") { @@ -31,6 +44,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter { assert(sources.length === 0) assert(sinks.length === 1) + assert(metricsSystem.metricsServlet != None) val source = new spark.deploy.master.MasterSource(null) metricsSystem.registerSource(source) diff --git a/pom.xml b/pom.xml index 7d96185775..abcdc05590 100644 --- a/pom.xml +++ b/pom.xml @@ -269,6 +269,11 @@ metrics-jvm 3.0.0 + + com.codahale.metrics + metrics-json + 3.0.0 + org.scala-lang scala-compiler diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f860925650..e5c8e8d230 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -186,6 +186,7 @@ object SparkBuild extends Build { "org.apache.derby" % "derby" % "10.4.2.0" % "test", "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", + "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" ) ++ ( -- cgit v1.2.3