diff options
author | Ryan Williams <ryan.blake.williams@gmail.com> | 2015-01-31 23:41:05 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-01-31 23:41:05 -0800 |
commit | 80bd715a3e2c39449ed5e4d4e7058d75281ef3cb (patch) | |
tree | c0140ef751014912156812a119bb24fe99438dd7 /core | |
parent | c84d5a10e8dbdeeeb54bc0d3f3dfb62ff0ca4fc1 (diff) | |
download | spark-80bd715a3e2c39449ed5e4d4e7058d75281ef3cb.tar.gz spark-80bd715a3e2c39449ed5e4d4e7058d75281ef3cb.tar.bz2 spark-80bd715a3e2c39449ed5e4d4e7058d75281ef3cb.zip |
[SPARK-5422] Add support for sending Graphite metrics via UDP
Depends on [SPARK-5413](https://issues.apache.org/jira/browse/SPARK-5413) / #4209, included here, will rebase once the latter's merged.
Author: Ryan Williams <ryan.blake.williams@gmail.com>
Closes #4218 from ryan-williams/udp and squashes the following commits:
ebae393 [Ryan Williams] Add support for sending Graphite metrics via UDP
cb58262 [Ryan Williams] bump metrics dependency to v3.1.0
Diffstat (limited to 'core')
-rw-r--r-- | core/pom.xml | 8 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 9 |
2 files changed, 11 insertions, 6 deletions
diff --git a/core/pom.xml b/core/pom.xml index 31e919a1c8..6fce10a0ae 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -198,19 +198,19 @@ <artifactId>stream</artifactId> </dependency> <dependency> - <groupId>com.codahale.metrics</groupId> + <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> </dependency> <dependency> - <groupId>com.codahale.metrics</groupId> + <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-jvm</artifactId> </dependency> <dependency> - <groupId>com.codahale.metrics</groupId> + <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-json</artifactId> </dependency> <dependency> - <groupId>com.codahale.metrics</groupId> + <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-graphite</artifactId> </dependency> <dependency> diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index d7b5f5c40e..2d25ebd661 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -22,7 +22,7 @@ import java.util.Properties import java.util.concurrent.TimeUnit import com.codahale.metrics.MetricRegistry -import com.codahale.metrics.graphite.{Graphite, GraphiteReporter} +import com.codahale.metrics.graphite.{GraphiteUDP, Graphite, GraphiteReporter} import org.apache.spark.SecurityManager import org.apache.spark.metrics.MetricsSystem @@ -38,6 +38,7 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric val GRAPHITE_KEY_PERIOD = "period" val GRAPHITE_KEY_UNIT = "unit" val GRAPHITE_KEY_PREFIX = "prefix" + val GRAPHITE_KEY_PROTOCOL = "protocol" def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) @@ -66,7 +67,11 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) - val graphite: Graphite = new Graphite(new InetSocketAddress(host, port)) + val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match { + case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port)) + case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port)) + case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p") + } val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) |