aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-13 09:28:18 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-13 09:28:18 -0700
commita0133bfbad22b057713d633f11ecc98d55fce129 (patch)
treef9aaa1016ae9bc66d30cd19fa0baf1b1235aa0f0 /core
parente2fdac60da8cb9b0ff0191631bf7e37ad3a47c76 (diff)
parent09c7179e812a06cb43a4975bca15d1b9963da975 (diff)
downloadspark-a0133bfbad22b057713d633f11ecc98d55fce129.tar.gz
spark-a0133bfbad22b057713d633f11ecc98d55fce129.tar.bz2
spark-a0133bfbad22b057713d633f11ecc98d55fce129.zip
Merge pull request #784 from jerryshao/dev-metrics-servlet
Add MetricsServlet for Spark metrics system
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala12
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala1
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala5
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala6
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala17
-rw-r--r--core/src/main/scala/spark/metrics/sink/MetricsServlet.scala55
-rw-r--r--core/src/main/scala/spark/ui/JettyUtils.scala2
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala9
-rw-r--r--core/src/test/scala/spark/metrics/MetricsConfigSuite.scala49
-rw-r--r--core/src/test/scala/spark/metrics/MetricsSystemSuite.scala24
14 files changed, 157 insertions, 35 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 485aa29f83..dfadd22d42 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -126,6 +126,10 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-json</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 4a4d9908a0..152cb2887a 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -57,14 +57,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
var firstApp: Option[ApplicationInfo] = None
- val webUi = new MasterWebUI(self, webUiPort)
-
Utils.checkHost(host, "Expected hostname")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
+ val webUi = new MasterWebUI(this, webUiPort)
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 36a1e91b24..405a1ec3a6 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -33,7 +33,7 @@ import spark.deploy.master.ExecutorInfo
import spark.ui.UIUtils
private[spark] class ApplicationPage(parent: MasterWebUI) {
- val master = parent.master
+ val master = parent.masterActorRef
implicit val timeout = parent.timeout
/** Executor details for a particular application */
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index d3b10f197b..4443d88056 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -35,7 +35,7 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo}
import spark.ui.UIUtils
private[spark] class IndexPage(parent: MasterWebUI) {
- val master = parent.master
+ val master = parent.masterActorRef
implicit val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index 31bdb7854e..f0a6ffe047 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -17,7 +17,6 @@
package spark.deploy.master.ui
-import akka.actor.ActorRef
import akka.util.Duration
import javax.servlet.http.HttpServletRequest
@@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, Utils}
+import spark.deploy.master.Master
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
@@ -32,12 +32,14 @@ import spark.ui.JettyUtils._
* Web UI server for the standalone master.
*/
private[spark]
-class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
+class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
implicit val timeout = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName()
val port = requestedPort
+ val masterActorRef = master.self
+
var server: Option[Server] = None
var boundPort: Option[Int] = None
@@ -57,7 +59,11 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
}
}
- val handlers = Array[(String, Handler)](
+ val metricsHandlers = master.masterMetricsSystem.metricsServlet.map(_.getHandlers)
+ .getOrElse(Array()) ++ master.applicationMetricsSystem.metricsServlet.map(_.getHandlers)
+ .getOrElse(Array())
+
+ val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0e46fa281e..0b5013b864 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -101,6 +101,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
+
webUi.start()
connectToMaster()
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index 742e0a5fb6..b408c63a02 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -17,7 +17,6 @@
package spark.deploy.worker.ui
-import akka.actor.ActorRef
import akka.util.{Duration, Timeout}
import java.io.{FileInputStream, File}
@@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val indexPage = new IndexPage(this)
- val handlers = Array[(String, Handler)](
+ val metricsHandlers = worker.metricsSystem.metricsServlet.map(_.getHandlers).getOrElse(Array())
+
+ val handlers = metricsHandlers ++ Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
("/logPage", (request: HttpServletRequest) => logPage(request)),
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 3e32e9c82f..d7fb5378a4 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -36,7 +36,11 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
var propertyCategories: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
- // empty function, any default property can be set here
+ prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
+ prop.setProperty("*.sink.servlet.uri", "/metrics/json")
+ prop.setProperty("*.sink.servlet.sample", "false")
+ prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
+ prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
}
def initialize() {
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 1dacafa135..04c750b17e 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
import spark.Logging
-import spark.metrics.sink.Sink
+import spark.metrics.sink.{MetricsServlet, Sink}
import spark.metrics.source.Source
/**
@@ -35,7 +35,7 @@ import spark.metrics.source.Source
* "instance" specify "who" (the role) use metrics system. In spark there are several roles
* like master, worker, executor, client driver, these roles will create metrics system
* for monitoring. So instance represents these roles. Currently in Spark, several instances
- * have already implemented: master, worker, executor, driver.
+ * have already implemented: master, worker, executor, driver, applications.
*
* "source" specify "where" (source) to collect metrics data. In metrics system, there exists
* two kinds of source:
@@ -51,8 +51,8 @@ import spark.metrics.source.Source
* Metrics configuration format is like below:
* [instance].[sink|source].[name].[options] = xxxx
*
- * [instance] can be "master", "worker", "executor", "driver", which means only the specified
- * instance has this property.
+ * [instance] can be "master", "worker", "executor", "driver", "applications" which means only
+ * the specified instance has this property.
* wild card "*" can be used to replace instance name, which means all the instances will have
* this property.
*
@@ -72,6 +72,9 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry()
+ // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
+ var metricsServlet: Option[MetricsServlet] = None
+
metricsConfig.initialize()
registerSources()
registerSinks()
@@ -126,7 +129,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
- sinks += sink.asInstanceOf[Sink]
+ if (kv._1 == "servlet") {
+ metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+ } else {
+ sinks += sink.asInstanceOf[Sink]
+ }
} catch {
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
}
diff --git a/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
new file mode 100644
index 0000000000..17432b1ed1
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/MetricsServlet.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.metrics.sink
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.json.MetricsModule
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import spark.ui.JettyUtils
+
+class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val SERVLET_KEY_URI = "uri"
+ val SERVLET_KEY_SAMPLE = "sample"
+
+ val servletURI = property.getProperty(SERVLET_KEY_URI)
+
+ val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
+
+ val mapper = new ObjectMapper().registerModule(
+ new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
+
+ def getHandlers = Array[(String, Handler)](
+ (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
+ )
+
+ def getMetricsSnapshot(request: HttpServletRequest): String = {
+ mapper.writeValueAsString(registry)
+ }
+
+ override def start() { }
+
+ override def stop() { }
+}
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index ca6088ad93..1cc85124d3 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -48,7 +48,7 @@ private[spark] object JettyUtils extends Logging {
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
- private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
+ def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
extractFn: T => String = (in: Any) => in.toString): Handler = {
new AbstractHandler {
def handle(target: String,
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 7599f82a94..4bcfdeb62b 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
-import spark.{Logging, SparkContext, Utils}
+import spark.{Logging, SparkContext, SparkEnv, Utils}
import spark.ui.env.EnvironmentUI
import spark.ui.exec.ExecutorsUI
import spark.ui.storage.BlockManagerUI
@@ -43,8 +43,13 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val jobs = new JobProgressUI(sc)
val env = new EnvironmentUI(sc)
val exec = new ExecutorsUI(sc)
+
+ // Add MetricsServlet handlers by default
+ val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet.map(_.getHandlers)
+ .getOrElse(Array())
+
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
- exec.getHandlers ++ handlers
+ exec.getHandlers ++ metricsServletHandlers ++ handlers
/** Bind the HTTP server which backs this web interface */
def bind() {
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index 87cd2ffad2..b0213b62d9 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -1,12 +1,24 @@
-package spark.metrics
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import java.util.Properties
-import java.io.{File, FileOutputStream}
+package spark.metrics
import org.scalatest.{BeforeAndAfter, FunSuite}
-import spark.metrics._
-
class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
@@ -18,11 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
- assert(conf.properties.size() === 0)
+ assert(conf.properties.size() === 5)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
- assert(property.size() === 0)
+ assert(property.size() === 3)
+ assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(property.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with properties set") {
@@ -30,16 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 3)
+ assert(masterProp.size() === 6)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
+ assert(masterProp.getProperty("sink.servlet.sample") === "false")
val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 3)
+ assert(workerProp.size() === 6)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
- assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
+ assert(workerProp.getProperty("sink.servlet.sample") === "false")
}
test("MetricsConfig with subProperties") {
@@ -47,7 +68,7 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val propCategories = conf.propertyCategories
- assert(propCategories.size === 2)
+ assert(propCategories.size === 3)
val masterProp = conf.getInstance("master")
val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
@@ -55,10 +76,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
- assert(sinkProps.size === 1)
+ assert(sinkProps.size === 2)
assert(sinkProps.contains("console"))
+ assert(sinkProps.contains("servlet"))
val consoleProps = sinkProps("console")
assert(consoleProps.size() === 2)
+
+ val servletProps = sinkProps("servlet")
+ assert(servletProps.size() === 3)
}
}
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
index c189996417..35c2ae41e9 100644
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -1,12 +1,24 @@
-package spark.metrics
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import java.util.Properties
-import java.io.{File, FileOutputStream}
+package spark.metrics
import org.scalatest.{BeforeAndAfter, FunSuite}
-import spark.metrics._
-
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
@@ -22,6 +34,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
assert(sources.length === 0)
assert(sinks.length === 0)
+ assert(metricsSystem.metricsServlet != None)
}
test("MetricsSystem with sources add") {
@@ -31,6 +44,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
assert(sources.length === 0)
assert(sinks.length === 1)
+ assert(metricsSystem.metricsServlet != None)
val source = new spark.deploy.master.MasterSource(null)
metricsSystem.registerSource(source)