From ef85a51f85c9720bc091367a0d4f80e7ed6b9778 Mon Sep 17 00:00:00 2001 From: Russell Cardullo Date: Fri, 8 Nov 2013 16:36:03 -0800 Subject: Add graphite sink for metrics This adds a metrics sink for graphite. The sink must be configured with the host and port of a graphite node and optionally may be configured with a prefix that will be prepended to all metrics that are sent to graphite. --- core/pom.xml | 4 ++ .../apache/spark/metrics/sink/GraphiteSink.scala | 82 ++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala (limited to 'core') diff --git a/core/pom.xml b/core/pom.xml index 8621d257e5..6af229c71d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -158,6 +158,10 @@ com.codahale.metrics metrics-ganglia + + com.codahale.metrics + metrics-graphite + org.apache.derby derby diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala new file mode 100644 index 0000000000..eb1315e6de --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} + +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.net.InetSocketAddress + +import org.apache.spark.metrics.MetricsSystem + +class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GRAPHITE_DEFAULT_PERIOD = 10 + val GRAPHITE_DEFAULT_UNIT = "SECONDS" + val GRAPHITE_DEFAULT_PREFIX = "" + + val GRAPHITE_KEY_HOST = "host" + val GRAPHITE_KEY_PORT = "port" + val GRAPHITE_KEY_PERIOD = "period" + val GRAPHITE_KEY_UNIT = "unit" + val GRAPHITE_KEY_PREFIX = "prefix" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { + throw new Exception("Graphite sink requires 'host' property.") + } + + if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) { + throw new Exception("Graphite sink requires 'port' property.") + } + + val host = propertyToOption(GRAPHITE_KEY_HOST).get + val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt + + val pollPeriod = Option(property.getProperty(GRAPHITE_KEY_PERIOD)) match { + case Some(s) => s.toInt + case None => GRAPHITE_DEFAULT_PERIOD + } + + val pollUnit = Option(property.getProperty(GRAPHITE_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) + } + + val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val graphite: Graphite = new Graphite(new InetSocketAddress(host, port)) + + val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .prefixedWith(prefix) + .build(graphite) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} -- cgit v1.2.3 From 1360f62d15170bd295ceaba85f39401fd8109e51 Mon Sep 17 00:00:00 2001 From: Russell Cardullo Date: Mon, 18 Nov 2013 08:37:09 -0800 Subject: Cleanup GraphiteSink.scala based on feedback * Reorder imports according to the style guide * Consistently use propertyToOption in all places --- .../scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index eb1315e6de..cdcfec8ca7 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -17,13 +17,13 @@ package org.apache.spark.metrics.sink -import com.codahale.metrics.MetricRegistry -import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} - import java.util.Properties import java.util.concurrent.TimeUnit import java.net.InetSocketAddress +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} + import org.apache.spark.metrics.MetricsSystem class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink { @@ -50,12 +50,12 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry) exten val host = propertyToOption(GRAPHITE_KEY_HOST).get val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt - val pollPeriod = Option(property.getProperty(GRAPHITE_KEY_PERIOD)) match { + val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { case Some(s) => s.toInt case None => GRAPHITE_DEFAULT_PERIOD } - val pollUnit = Option(property.getProperty(GRAPHITE_KEY_UNIT)) match { + val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) } -- cgit v1.2.3