diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-08-13 09:28:18 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-08-13 09:28:18 -0700 |
commit | a0133bfbad22b057713d633f11ecc98d55fce129 (patch) | |
tree | f9aaa1016ae9bc66d30cd19fa0baf1b1235aa0f0 /core | |
parent | e2fdac60da8cb9b0ff0191631bf7e37ad3a47c76 (diff) | |
parent | 09c7179e812a06cb43a4975bca15d1b9963da975 (diff) | |
download | spark-a0133bfbad22b057713d633f11ecc98d55fce129.tar.gz spark-a0133bfbad22b057713d633f11ecc98d55fce129.tar.bz2 spark-a0133bfbad22b057713d633f11ecc98d55fce129.zip |
Merge pull request #784 from jerryshao/dev-metrics-servlet
Add MetricsServlet for Spark metrics system
Diffstat (limited to 'core')
14 files changed, 157 insertions, 35 deletions
diff --git a/core/pom.xml b/core/pom.xml index 485aa29f83..dfadd22d42 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -126,6 +126,10 @@ <groupId>com.codahale.metrics</groupId> <artifactId>metrics-jvm</artifactId> </dependency> + <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-json</artifactId> + </dependency> <dependency> <groupId>org.apache.derby</groupId> diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 4a4d9908a0..152cb2887a 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -57,14 +57,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act var firstApp: Option[ApplicationInfo] = None - val webUi = new MasterWebUI(self, webUiPort) - Utils.checkHost(host, "Expected hostname") val masterMetricsSystem = MetricsSystem.createMetricsSystem("master") val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications") val masterSource = new MasterSource(this) + val webUi = new MasterWebUI(this, webUiPort) + val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala index 36a1e91b24..405a1ec3a6 100644 --- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -33,7 +33,7 @@ import spark.deploy.master.ExecutorInfo import spark.ui.UIUtils private[spark] class ApplicationPage(parent: MasterWebUI) { - val master = parent.master + val master = parent.masterActorRef implicit val timeout = parent.timeout /** Executor details for a particular application */ diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala index d3b10f197b..4443d88056 100644 --- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -35,7 +35,7 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo} import spark.ui.UIUtils private[spark] class IndexPage(parent: MasterWebUI) { - val master = parent.master + val master = parent.masterActorRef implicit val timeout = parent.timeout def renderJson(request: HttpServletRequest): JValue = { diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala index 31bdb7854e..f0a6ffe047 100644 --- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala @@ -17,7 +17,6 @@ package spark.deploy.master.ui -import akka.actor.ActorRef import akka.util.Duration import javax.servlet.http.HttpServletRequest @@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.{Handler, Server} import spark.{Logging, Utils} +import spark.deploy.master.Master import spark.ui.JettyUtils import spark.ui.JettyUtils._ @@ -32,12 +32,14 @@ import spark.ui.JettyUtils._ * Web UI server for the standalone master. */ private[spark] -class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging { +class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { implicit val timeout = Duration.create( System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") val host = Utils.localHostName() val port = requestedPort + val masterActorRef = master.self + var server: Option[Server] = None var boundPort: Option[Int] = None @@ -57,7 +59,11 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging { } } - val handlers = Array[(String, Handler)]( + val metricsHandlers = master.masterMetricsSystem.metricsServlet.map(_.getHandlers) + .getOrElse(Array()) ++ master.applicationMetricsSystem.metricsServlet.map(_.getHandlers) + .getOrElse(Array()) + + val handlers = metricsHandlers ++ Array[(String, Handler)]( ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)), ("/app", (request: HttpServletRequest) => applicationPage.render(request)), diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 0e46fa281e..0b5013b864 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -101,6 +101,7 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) + webUi.start() connectToMaster() diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index 742e0a5fb6..b408c63a02 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -17,7 +17,6 @@ package spark.deploy.worker.ui -import akka.actor.ActorRef import akka.util.{Duration, Timeout} import java.io.{FileInputStream, File} @@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val indexPage = new IndexPage(this) - val handlers = Array[(String, Handler)]( + val metricsHandlers = worker.metricsSystem.metricsServlet.map(_.getHandlers).getOrElse(Array()) + + val handlers = metricsHandlers ++ Array[(String, Handler)]( ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), ("/log", (request: HttpServletRequest) => log(request)), ("/logPage", (request: HttpServletRequest) => logPage(request)), diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala index 3e32e9c82f..d7fb5378a4 100644 --- a/core/src/main/scala/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala @@ -36,7 +36,11 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi var propertyCategories: mutable.HashMap[String, Properties] = null private def setDefaultProperties(prop: Properties) { - // empty function, any default property can be set here + prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet") + prop.setProperty("*.sink.servlet.uri", "/metrics/json") + prop.setProperty("*.sink.servlet.sample", "false") + prop.setProperty("master.sink.servlet.uri", "/metrics/master/json") + prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json") } def initialize() { diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala index 1dacafa135..04c750b17e 100644 --- a/core/src/main/scala/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala @@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import spark.Logging -import spark.metrics.sink.Sink +import spark.metrics.sink.{MetricsServlet, Sink} import spark.metrics.source.Source /** @@ -35,7 +35,7 @@ import spark.metrics.source.Source * "instance" specify "who" (the role) use metrics system. In spark there are several roles * like master, worker, executor, client driver, these roles will create metrics system * for monitoring. So instance represents these roles. Currently in Spark, several instances - * have already implemented: master, worker, executor, driver. + * have already implemented: master, worker, executor, driver, applications. * * "source" specify "where" (source) to collect metrics data. In metrics system, there exists * two kinds of source: @@ -51,8 +51,8 @@ import spark.metrics.source.Source * Metrics configuration format is like below: * [instance].[sink|source].[name].[options] = xxxx * - * [instance] can be "master", "worker", "executor", "driver", which means only the specified - * instance has this property. + * [instance] can be "master", "worker", "executor", "driver", "applications" which means only + * the specified instance has this property. * wild card "*" can be used to replace instance name, which means all the instances will have * this property. * @@ -72,6 +72,9 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin val sources = new mutable.ArrayBuffer[Source] val registry = new MetricRegistry() + // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui + var metricsServlet: Option[MetricsServlet] = None + metricsConfig.initialize() registerSources() registerSinks() @@ -126,7 +129,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin val sink = Class.forName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry]) .newInstance(kv._2, registry) - sinks += sink.asInstanceOf[Sink] + if (kv._1 == "servlet") { + metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { + sinks += sink.asInstanceOf[Sink] + } } catch { case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e) } diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala new file mode 100644 index 0000000000..17432b1ed1 --- /dev/null +++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.metrics.sink + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.json.MetricsModule + +import com.fasterxml.jackson.databind.ObjectMapper + +import java.util.Properties +import java.util.concurrent.TimeUnit +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import spark.ui.JettyUtils + +class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink { + val SERVLET_KEY_URI = "uri" + val SERVLET_KEY_SAMPLE = "sample" + + val servletURI = property.getProperty(SERVLET_KEY_URI) + + val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean + + val mapper = new ObjectMapper().registerModule( + new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) + + def getHandlers = Array[(String, Handler)]( + (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) + ) + + def getMetricsSnapshot(request: HttpServletRequest): String = { + mapper.writeValueAsString(registry) + } + + override def start() { } + + override def stop() { } +} diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala index ca6088ad93..1cc85124d3 100644 --- a/core/src/main/scala/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/spark/ui/JettyUtils.scala @@ -48,7 +48,7 @@ private[spark] object JettyUtils extends Logging { implicit def textResponderToHandler(responder: Responder[String]): Handler = createHandler(responder, "text/plain") - private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, + def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, extractFn: T => String = (in: Any) => in.toString): Handler = { new AbstractHandler { def handle(target: String, diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index 7599f82a94..4bcfdeb62b 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.{Handler, Server} -import spark.{Logging, SparkContext, Utils} +import spark.{Logging, SparkContext, SparkEnv, Utils} import spark.ui.env.EnvironmentUI import spark.ui.exec.ExecutorsUI import spark.ui.storage.BlockManagerUI @@ -43,8 +43,13 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { val jobs = new JobProgressUI(sc) val env = new EnvironmentUI(sc) val exec = new ExecutorsUI(sc) + + // Add MetricsServlet handlers by default + val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet.map(_.getHandlers) + .getOrElse(Array()) + val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ - exec.getHandlers ++ handlers + exec.getHandlers ++ metricsServletHandlers ++ handlers /** Bind the HTTP server which backs this web interface */ def bind() { diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala index 87cd2ffad2..b0213b62d9 100644 --- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala @@ -1,12 +1,24 @@ -package spark.metrics +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import java.util.Properties -import java.io.{File, FileOutputStream} +package spark.metrics import org.scalatest.{BeforeAndAfter, FunSuite} -import spark.metrics._ - class MetricsConfigSuite extends FunSuite with BeforeAndAfter { var filePath: String = _ @@ -18,11 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { val conf = new MetricsConfig(Option("dummy-file")) conf.initialize() - assert(conf.properties.size() === 0) + assert(conf.properties.size() === 5) assert(conf.properties.getProperty("test-for-dummy") === null) val property = conf.getInstance("random") - assert(property.size() === 0) + assert(property.size() === 3) + assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") + assert(property.getProperty("sink.servlet.uri") === "/metrics/json") + assert(property.getProperty("sink.servlet.sample") === "false") } test("MetricsConfig with properties set") { @@ -30,16 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val masterProp = conf.getInstance("master") - assert(masterProp.size() === 3) + assert(masterProp.size() === 6) assert(masterProp.getProperty("sink.console.period") === "20") assert(masterProp.getProperty("sink.console.unit") === "minutes") assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") + assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") + assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json") + assert(masterProp.getProperty("sink.servlet.sample") === "false") val workerProp = conf.getInstance("worker") - assert(workerProp.size() === 3) + assert(workerProp.size() === 6) assert(workerProp.getProperty("sink.console.period") === "10") assert(workerProp.getProperty("sink.console.unit") === "seconds") - assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") + assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") + assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") + assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json") + assert(workerProp.getProperty("sink.servlet.sample") === "false") } test("MetricsConfig with subProperties") { @@ -47,7 +68,7 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val propCategories = conf.propertyCategories - assert(propCategories.size === 2) + assert(propCategories.size === 3) val masterProp = conf.getInstance("master") val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX) @@ -55,10 +76,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource") val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX) - assert(sinkProps.size === 1) + assert(sinkProps.size === 2) assert(sinkProps.contains("console")) + assert(sinkProps.contains("servlet")) val consoleProps = sinkProps("console") assert(consoleProps.size() === 2) + + val servletProps = sinkProps("servlet") + assert(servletProps.size() === 3) } } diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala index c189996417..35c2ae41e9 100644 --- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala @@ -1,12 +1,24 @@ -package spark.metrics +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import java.util.Properties -import java.io.{File, FileOutputStream} +package spark.metrics import org.scalatest.{BeforeAndAfter, FunSuite} -import spark.metrics._ - class MetricsSystemSuite extends FunSuite with BeforeAndAfter { var filePath: String = _ @@ -22,6 +34,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter { assert(sources.length === 0) assert(sinks.length === 0) + assert(metricsSystem.metricsServlet != None) } test("MetricsSystem with sources add") { @@ -31,6 +44,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter { assert(sources.length === 0) assert(sinks.length === 1) + assert(metricsSystem.metricsServlet != None) val source = new spark.deploy.master.MasterSource(null) metricsSystem.registerSource(source) |