diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-11-24 16:19:51 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-11-24 16:19:51 -0800 |
commit | 859d62dc2a37f56f8c85138df069a359e7fabb45 (patch) | |
tree | 5a30d32d04131183f6bff3b2df5f376187494d46 /core | |
parent | 65de73c7f8529be363e9ae814d0d4eab0da112aa (diff) | |
parent | 1360f62d15170bd295ceaba85f39401fd8109e51 (diff) | |
download | spark-859d62dc2a37f56f8c85138df069a359e7fabb45.tar.gz spark-859d62dc2a37f56f8c85138df069a359e7fabb45.tar.bz2 spark-859d62dc2a37f56f8c85138df069a359e7fabb45.zip |
Merge pull request #151 from russellcardullo/add-graphite-sink
Add graphite sink for metrics
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
Diffstat (limited to 'core')
-rw-r--r-- | core/pom.xml | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 82 |
2 files changed, 86 insertions, 0 deletions
diff --git a/core/pom.xml b/core/pom.xml index 8621d257e5..6af229c71d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -159,6 +159,10 @@ <artifactId>metrics-ganglia</artifactId> </dependency> <dependency> + <groupId>com.codahale.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + </dependency> + <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <scope>test</scope> diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala new file mode 100644 index 0000000000..cdcfec8ca7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.net.InetSocketAddress + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} + +import org.apache.spark.metrics.MetricsSystem + +class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GRAPHITE_DEFAULT_PERIOD = 10 + val GRAPHITE_DEFAULT_UNIT = "SECONDS" + val GRAPHITE_DEFAULT_PREFIX = "" + + val GRAPHITE_KEY_HOST = "host" + val GRAPHITE_KEY_PORT = "port" + val GRAPHITE_KEY_PERIOD = "period" + val GRAPHITE_KEY_UNIT = "unit" + val GRAPHITE_KEY_PREFIX = "prefix" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { + throw new Exception("Graphite sink requires 'host' property.") + } + + if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) { + throw new Exception("Graphite sink requires 'port' property.") + } + + val host = propertyToOption(GRAPHITE_KEY_HOST).get + val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt + + val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { + case Some(s) => s.toInt + case None => GRAPHITE_DEFAULT_PERIOD + } + + val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) + } + + val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val graphite: Graphite = new Graphite(new InetSocketAddress(host, port)) + + val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .prefixedWith(prefix) + .build(graphite) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} |