aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/metrics.properties.template1
-rw-r--r--external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala5
2 files changed, 6 insertions, 0 deletions
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 8a4f4e4833..aeb76c9b2f 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -93,6 +93,7 @@
# period 10 Poll period
# unit seconds Unit of the poll period
# ttl 1 TTL of messages sent by Ganglia
+# dmax 0 Lifetime in seconds of metrics (0 never expired)
# mode multicast Ganglia network mode ('unicast' or 'multicast')
# org.apache.spark.metrics.sink.JmxSink
diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
index 3b1880e143..0cd795f638 100644
--- a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
+++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -46,6 +46,9 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val GANGLIA_KEY_HOST = "host"
val GANGLIA_KEY_PORT = "port"
+ val GANGLIA_KEY_DMAX = "dmax"
+ val GANGLIA_DEFAULT_DMAX = 0
+
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
@@ -59,6 +62,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val host = propertyToOption(GANGLIA_KEY_HOST).get
val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
+ val dmax = propertyToOption(GANGLIA_KEY_DMAX).map(_.toInt).getOrElse(GANGLIA_DEFAULT_DMAX)
val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
.map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
@@ -73,6 +77,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
+ .withDMax(dmax)
.build(ganglia)
override def start() {