aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan Williams <ryan.blake.williams@gmail.com>2015-01-31 23:41:05 -0800
committerReynold Xin <rxin@databricks.com>2015-01-31 23:41:05 -0800
commit80bd715a3e2c39449ed5e4d4e7058d75281ef3cb (patch)
treec0140ef751014912156812a119bb24fe99438dd7 /core
parentc84d5a10e8dbdeeeb54bc0d3f3dfb62ff0ca4fc1 (diff)
downloadspark-80bd715a3e2c39449ed5e4d4e7058d75281ef3cb.tar.gz
spark-80bd715a3e2c39449ed5e4d4e7058d75281ef3cb.tar.bz2
spark-80bd715a3e2c39449ed5e4d4e7058d75281ef3cb.zip
[SPARK-5422] Add support for sending Graphite metrics via UDP
Depends on [SPARK-5413](https://issues.apache.org/jira/browse/SPARK-5413) / #4209, included here, will rebase once the latter's merged. Author: Ryan Williams <ryan.blake.williams@gmail.com> Closes #4218 from ryan-williams/udp and squashes the following commits: ebae393 [Ryan Williams] Add support for sending Graphite metrics via UDP cb58262 [Ryan Williams] bump metrics dependency to v3.1.0
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml8
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala9
2 files changed, 11 insertions, 6 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 31e919a1c8..6fce10a0ae 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -198,19 +198,19 @@
<artifactId>stream</artifactId>
</dependency>
<dependency>
- <groupId>com.codahale.metrics</groupId>
+ <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
- <groupId>com.codahale.metrics</groupId>
+ <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
</dependency>
<dependency>
- <groupId>com.codahale.metrics</groupId>
+ <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-json</artifactId>
</dependency>
<dependency>
- <groupId>com.codahale.metrics</groupId>
+ <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index d7b5f5c40e..2d25ebd661 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
+import com.codahale.metrics.graphite.{GraphiteUDP, Graphite, GraphiteReporter}
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
@@ -38,6 +38,7 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
val GRAPHITE_KEY_PERIOD = "period"
val GRAPHITE_KEY_UNIT = "unit"
val GRAPHITE_KEY_PREFIX = "prefix"
+ val GRAPHITE_KEY_PROTOCOL = "protocol"
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
@@ -66,7 +67,11 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
- val graphite: Graphite = new Graphite(new InetSocketAddress(host, port))
+ val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase) match {
+ case Some("udp") => new GraphiteUDP(new InetSocketAddress(host, port))
+ case Some("tcp") | None => new Graphite(new InetSocketAddress(host, port))
+ case Some(p) => throw new Exception(s"Invalid Graphite protocol: $p")
+ }
val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)