From 09c7179e812a06cb43a4975bca15d1b9963da975 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 12 Aug 2013 11:40:40 +0800 Subject: MetricsServlet code refactor according to comments --- conf/metrics.properties.template | 13 ++++--------- core/src/main/scala/spark/deploy/master/Master.scala | 11 +---------- .../scala/spark/deploy/master/ui/ApplicationPage.scala | 2 +- .../main/scala/spark/deploy/master/ui/IndexPage.scala | 2 +- .../main/scala/spark/deploy/master/ui/MasterWebUI.scala | 12 +++++++++--- core/src/main/scala/spark/deploy/worker/Worker.scala | 5 ----- .../main/scala/spark/deploy/worker/ui/WorkerWebUI.scala | 5 +++-- core/src/main/scala/spark/metrics/MetricsConfig.scala | 6 ++++-- core/src/main/scala/spark/metrics/MetricsSystem.scala | 2 +- .../main/scala/spark/metrics/sink/MetricsServlet.scala | 10 ++-------- core/src/main/scala/spark/ui/SparkUI.scala | 6 ++---- .../test/scala/spark/metrics/MetricsConfigSuite.scala | 17 +++++++++++------ 12 files changed, 39 insertions(+), 52 deletions(-) diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 07fd046539..6c36f3cca4 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -41,17 +41,12 @@ # customize metrics system. You can also put the file in ${SPARK_HOME}/conf # and it will be loaded automatically. # 5. MetricsServlet is added by default as a sink in master, worker and client -# driver, you can send http request "/metrics" to get a snapshot of all the -# registered metrics in json format. For master, requests "/metrics/master" and -# "/metrics/applications" can be sent seperately to get metrics snapshot of -# instance master and applications. +# driver, you can send http request "/metrics/json" to get a snapshot of all the +# registered metrics in json format. For master, requests "/metrics/master/json" and +# "/metrics/applications/json" can be sent seperately to get metrics snapshot of +# instance master and applications. MetricsServlet may not be configured by self. # -# Change MetricsServlet's property -#*.sink.servlet.uri=/metrics -# -#*.sink.servlet.sample=false - # Enable JmxSink for all instances by class name #*.sink.jmx.class=spark.metrics.sink.JmxSink diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index f4a74830c6..152cb2887a 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -57,22 +57,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act var firstApp: Option[ApplicationInfo] = None - val webUi = new MasterWebUI(self, webUiPort) - Utils.checkHost(host, "Expected hostname") val masterMetricsSystem = MetricsSystem.createMetricsSystem("master") val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications") val masterSource = new MasterSource(this) - // Add default MetricsServlet handler to web ui - masterMetricsSystem.metricsServlet foreach { m => - webUi.handlers = m.getHandlers ++ webUi.handlers - } - - applicationMetricsSystem.metricsServlet foreach { m => - webUi.handlers = m.getHandlers ++ webUi.handlers - } + val webUi = new MasterWebUI(this, webUiPort) val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala index 36a1e91b24..405a1ec3a6 100644 --- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -33,7 +33,7 @@ import spark.deploy.master.ExecutorInfo import spark.ui.UIUtils private[spark] class ApplicationPage(parent: MasterWebUI) { - val master = parent.master + val master = parent.masterActorRef implicit val timeout = parent.timeout /** Executor details for a particular application */ diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala index d3b10f197b..4443d88056 100644 --- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -35,7 +35,7 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo} import spark.ui.UIUtils private[spark] class IndexPage(parent: MasterWebUI) { - val master = parent.master + val master = parent.masterActorRef implicit val timeout = parent.timeout def renderJson(request: HttpServletRequest): JValue = { diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala index d9503663f4..f0a6ffe047 100644 --- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala @@ -17,7 +17,6 @@ package spark.deploy.master.ui -import akka.actor.ActorRef import akka.util.Duration import javax.servlet.http.HttpServletRequest @@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.{Handler, Server} import spark.{Logging, Utils} +import spark.deploy.master.Master import spark.ui.JettyUtils import spark.ui.JettyUtils._ @@ -32,12 +32,14 @@ import spark.ui.JettyUtils._ * Web UI server for the standalone master. */ private[spark] -class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging { +class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { implicit val timeout = Duration.create( System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") val host = Utils.localHostName() val port = requestedPort + val masterActorRef = master.self + var server: Option[Server] = None var boundPort: Option[Int] = None @@ -57,7 +59,11 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging { } } - var handlers = Array[(String, Handler)]( + val metricsHandlers = master.masterMetricsSystem.metricsServlet.map(_.getHandlers) + .getOrElse(Array()) ++ master.applicationMetricsSystem.metricsServlet.map(_.getHandlers) + .getOrElse(Array()) + + val handlers = metricsHandlers ++ Array[(String, Handler)]( ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)), ("/app", (request: HttpServletRequest) => applicationPage.render(request)), diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 92f8cbc610..0b5013b864 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -102,11 +102,6 @@ private[spark] class Worker( createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) - // Add default MetricsServlet handlers to webUi - metricsSystem.metricsServlet foreach { m => - webUi.handlers = m.getHandlers ++ webUi.handlers - } - webUi.start() connectToMaster() diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index d345cbecac..b408c63a02 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -17,7 +17,6 @@ package spark.deploy.worker.ui -import akka.actor.ActorRef import akka.util.{Duration, Timeout} import java.io.{FileInputStream, File} @@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val indexPage = new IndexPage(this) - var handlers = Array[(String, Handler)]( + val metricsHandlers = worker.metricsSystem.metricsServlet.map(_.getHandlers).getOrElse(Array()) + + val handlers = metricsHandlers ++ Array[(String, Handler)]( ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), ("/log", (request: HttpServletRequest) => log(request)), ("/logPage", (request: HttpServletRequest) => logPage(request)), diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala index d10dc45395..d7fb5378a4 100644 --- a/core/src/main/scala/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala @@ -37,8 +37,10 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet") - prop.setProperty("master.sink.servlet.uri", "/metrics/master") - prop.setProperty("applications.sink.servlet.uri", "/metrics/applications") + prop.setProperty("*.sink.servlet.uri", "/metrics/json") + prop.setProperty("*.sink.servlet.sample", "false") + prop.setProperty("master.sink.servlet.uri", "/metrics/master/json") + prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json") } def initialize() { diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala index ae1f853691..04c750b17e 100644 --- a/core/src/main/scala/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala @@ -129,7 +129,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin val sink = Class.forName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry]) .newInstance(kv._2, registry) - if (kv._1 =="servlet") { + if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala index 39ede9b2df..17432b1ed1 100644 --- a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala @@ -34,15 +34,9 @@ class MetricsServlet(val property: Properties, val registry: MetricRegistry) ext val SERVLET_KEY_URI = "uri" val SERVLET_KEY_SAMPLE = "sample" - val SERVLET_DEFAULT_URI = "/metrics" - val SERVLET_DEFAULT_SAMPLE = false + val servletURI = property.getProperty(SERVLET_KEY_URI) - val servletURI = property.getProperty(SERVLET_KEY_URI, SERVLET_DEFAULT_URI) - - val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)) match { - case Some(s) => s.toBoolean - case None => SERVLET_DEFAULT_SAMPLE - } + val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean val mapper = new ObjectMapper().registerModule( new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index 7e8a41c72e..4bcfdeb62b 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -45,10 +45,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { val exec = new ExecutorsUI(sc) // Add MetricsServlet handlers by default - val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet match { - case Some(s) => s.getHandlers - case None => Array[(String, Handler)]() - } + val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet.map(_.getHandlers) + .getOrElse(Array()) val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ exec.getHandlers ++ metricsServletHandlers ++ handlers diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala index df999cd532..b0213b62d9 100644 --- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala @@ -30,12 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { val conf = new MetricsConfig(Option("dummy-file")) conf.initialize() - assert(conf.properties.size() === 3) + assert(conf.properties.size() === 5) assert(conf.properties.getProperty("test-for-dummy") === null) val property = conf.getInstance("random") - assert(property.size() === 1) + assert(property.size() === 3) assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") + assert(property.getProperty("sink.servlet.uri") === "/metrics/json") + assert(property.getProperty("sink.servlet.sample") === "false") } test("MetricsConfig with properties set") { @@ -43,19 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val masterProp = conf.getInstance("master") - assert(masterProp.size() === 5) + assert(masterProp.size() === 6) assert(masterProp.getProperty("sink.console.period") === "20") assert(masterProp.getProperty("sink.console.unit") === "minutes") assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") - assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master") + assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json") + assert(masterProp.getProperty("sink.servlet.sample") === "false") val workerProp = conf.getInstance("worker") - assert(workerProp.size() === 4) + assert(workerProp.size() === 6) assert(workerProp.getProperty("sink.console.period") === "10") assert(workerProp.getProperty("sink.console.unit") === "seconds") assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource") assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet") + assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json") + assert(workerProp.getProperty("sink.servlet.sample") === "false") } test("MetricsConfig with subProperties") { @@ -79,6 +84,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { assert(consoleProps.size() === 2) val servletProps = sinkProps("servlet") - assert(servletProps.size() === 2) + assert(servletProps.size() === 3) } } -- cgit v1.2.3