aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-09-08 18:32:16 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-09-08 18:32:16 -0700
commitf68848d95d896b578235c063be51483b4fce518e (patch)
treefcbfaecaa189fcc472b6d01c498d941cadce81e4
parent0b957997ada10fcfa96e42780482d986bf7e4885 (diff)
parentb4e382c210b4987da78421f5de11199e4d74f0e7 (diff)
downloadspark-f68848d95d896b578235c063be51483b4fce518e.tar.gz
spark-f68848d95d896b578235c063be51483b4fce518e.tar.bz2
spark-f68848d95d896b578235c063be51483b4fce518e.zip
Merge pull request #906 from pwendell/ganglia-sink
Clean-up of Metrics Code/Docs and Add Ganglia Sink
-rw-r--r--conf/metrics.properties.template50
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala19
-rw-r--r--docs/monitoring.md9
-rw-r--r--project/SparkBuild.scala1
12 files changed, 166 insertions, 36 deletions
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 6c36f3cca4..ae10f615d1 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -31,7 +31,7 @@
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
-# is 1 second.
+# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
@@ -47,11 +47,45 @@
# instance master and applications. MetricsServlet may not be configured by self.
#
+## List of available sinks and their properties.
+
+# org.apache.spark.metrics.sink.ConsoleSink
+# Name: Default: Description:
+# period 10 Poll period
+# unit seconds Units of poll period
+
+# org.apache.spark.metrics.sink.CSVSink
+# Name: Default: Description:
+# period 10 Poll period
+# unit seconds Units of poll period
+# directory /tmp Where to store CSV files
+
+# org.apache.spark.metrics.sink.GangliaSink
+# Name: Default: Description:
+# host NONE Hostname or multicast group of Ganglia server
+# port NONE Port of Ganglia server(s)
+# period 10 Poll period
+# unit seconds Units of poll period
+# ttl 1 TTL of messages sent by Ganglia
+# mode multicast Ganglia network mode ('unicast' or 'mulitcast')
+
+# org.apache.spark.metrics.sink.JmxSink
+
+# org.apache.spark.metrics.sink.MetricsServlet
+# Name: Default: Description:
+# path VARIES* Path prefix from the web server root
+# sample false Whether to show entire set of samples for histograms ('false' or 'true')
+#
+# * Default path is /metrics/json for all instances except the master. The master has two paths:
+# /metrics/aplications/json # App information
+# /metrics/master/json # Master information
+
+## Examples
# Enable JmxSink for all instances by class name
-#*.sink.jmx.class=spark.metrics.sink.JmxSink
+#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Enable ConsoleSink for all instances by class name
-#*.sink.console.class=spark.metrics.sink.ConsoleSink
+#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Polling period for ConsoleSink
#*.sink.console.period=10
@@ -64,7 +98,7 @@
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
-#*.sink.csv.class=spark.metrics.sink.CsvSink
+#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
@@ -80,11 +114,11 @@
#worker.sink.csv.unit=minutes
# Enable jvm source for instance master, worker, driver and executor
-#master.source.jvm.class=spark.metrics.source.JvmSource
+#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
-#worker.source.jvm.class=spark.metrics.source.JvmSource
+#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
-#driver.source.jvm.class=spark.metrics.source.JvmSource
+#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
-#executor.source.jvm.class=spark.metrics.source.JvmSource
+#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 89318712a5..4f711a5ea6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -282,8 +282,8 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
- val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
- val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+ val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
+ val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d365804994..ceae3b8289 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -98,7 +98,7 @@ private[spark] class Executor(
}
)
- val executorSource = new ExecutorSource(this)
+ val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index bf8fb4fd21..18c9dc1c0a 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source
-class ExecutorSource(val executor: Executor) extends Source {
+class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
@@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source {
}
val metricRegistry = new MetricRegistry()
- val sourceName = "executor"
+ // TODO: It would be nice to pass the application name here
+ val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 0f9c4e00b1..caab748d60 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
- prop.setProperty("*.sink.servlet.uri", "/metrics/json")
- prop.setProperty("*.sink.servlet.sample", "false")
- prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
- prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
+ prop.setProperty("*.sink.servlet.path", "/metrics/json")
+ prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
+ prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
def initialize() {
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
new file mode 100644
index 0000000000..b924907070
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.ganglia.GangliaReporter
+import com.codahale.metrics.MetricRegistry
+import info.ganglia.gmetric4j.gmetric.GMetric
+
+import org.apache.spark.metrics.MetricsSystem
+
+class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val GANGLIA_KEY_PERIOD = "period"
+ val GANGLIA_DEFAULT_PERIOD = 10
+
+ val GANGLIA_KEY_UNIT = "unit"
+ val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
+
+ val GANGLIA_KEY_MODE = "mode"
+ val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
+
+ // TTL for multicast messages. If listeners are X hops away in network, must be at least X.
+ val GANGLIA_KEY_TTL = "ttl"
+ val GANGLIA_DEFAULT_TTL = 1
+
+ val GANGLIA_KEY_HOST = "host"
+ val GANGLIA_KEY_PORT = "port"
+
+ def propertyToOption(prop: String) = Option(property.getProperty(prop))
+
+ if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
+ throw new Exception("Ganglia sink requires 'host' property.")
+ }
+
+ if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
+ throw new Exception("Ganglia sink requires 'port' property.")
+ }
+
+ val host = propertyToOption(GANGLIA_KEY_HOST).get
+ val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
+ val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
+ val mode = propertyToOption(GANGLIA_KEY_MODE)
+ .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
+ val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
+ .getOrElse(GANGLIA_DEFAULT_PERIOD)
+ val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
+ .getOrElse(GANGLIA_DEFAULT_UNIT)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val ganglia = new GMetric(host, port, mode, ttl)
+ val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(ganglia)
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 4e90dd4323..99357fede6 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.ui.JettyUtils
class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
- val SERVLET_KEY_URI = "uri"
+ val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
- val servletURI = property.getProperty(SERVLET_KEY_URI)
+ val SERVLET_DEFAULT_SAMPLE = false
- val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
+ val servletPath = property.getProperty(SERVLET_KEY_PATH)
+
+ val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean)
+ .getOrElse(SERVLET_DEFAULT_SAMPLE)
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
def getHandlers = Array[(String, Handler)](
- (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
+ (servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
)
def getMetricsSnapshot(request: HttpServletRequest): String = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 22e3723ac8..446d490cc9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -20,10 +20,12 @@ package org.apache.spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "DAGScheduler"
+ val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 3d709cfde4..acc3951088 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -20,11 +20,13 @@ package org.apache.spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "BlockManager"
+ val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index 58c94a162d..1a9ce8c607 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
- assert(conf.properties.size() === 5)
+ assert(conf.properties.size() === 4)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
- assert(property.size() === 3)
+ assert(property.size() === 2)
assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(property.getProperty("sink.servlet.sample") === "false")
+ assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set") {
@@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 6)
+ assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
- assert(masterProp.getProperty("sink.servlet.sample") === "false")
+ assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 6)
+ assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
- assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(workerProp.getProperty("sink.servlet.sample") === "false")
+ assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with subProperties") {
@@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(consoleProps.size() === 2)
val servletProps = sinkProps("servlet")
- assert(servletProps.size() === 3)
+ assert(servletProps.size() === 2)
}
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 0ec987107c..4c4f174503 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -31,6 +31,15 @@ set of sinks to which metrics are reported. The following instances are currentl
* `executor`: A Spark executor.
* `driver`: The Spark driver process (the process in which your SparkContext is created).
+Each instance can report to zero or more _sinks_. Sinks are contained in the
+`org.apache.spark.metrics.sink` package:
+
+* `ConsoleSink`: Logs metrics information to the console.
+* `CSVSink`: Exports metrics data to CSV files at regular intervals.
+* `GangliaSink`: Sends metrics to a Ganglia node or multicast group.
+* `JmxSink`: Registers metrics for viewing in a JXM console.
+* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
+
The syntax of the metrics configuration file is defined in an example configuration file,
`$SPARK_HOME/conf/metrics.conf.template`.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 90948c8e2c..a60b553b5a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -213,6 +213,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
+ "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
)