diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-09-08 10:07:29 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-09-08 10:08:18 -0700 |
commit | 8de8ee5d3cf01fd225336064d9586380a4fb6ad4 (patch) | |
tree | caecb11246afe497593827850a645b29f875635a /core | |
parent | 6d2198643ccb3f3c1a7a3cf07b5b96a98ae7bb1c (diff) | |
download | spark-8de8ee5d3cf01fd225336064d9586380a4fb6ad4.tar.gz spark-8de8ee5d3cf01fd225336064d9586380a4fb6ad4.tar.bz2 spark-8de8ee5d3cf01fd225336064d9586380a4fb6ad4.zip |
Ganglia sink
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala new file mode 100644 index 0000000000..b924907070 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.ganglia.GangliaReporter +import com.codahale.metrics.MetricRegistry +import info.ganglia.gmetric4j.gmetric.GMetric + +import org.apache.spark.metrics.MetricsSystem + +class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GANGLIA_KEY_PERIOD = "period" + val GANGLIA_DEFAULT_PERIOD = 10 + + val GANGLIA_KEY_UNIT = "unit" + val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS + + val GANGLIA_KEY_MODE = "mode" + val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST + + // TTL for multicast messages. If listeners are X hops away in network, must be at least X. + val GANGLIA_KEY_TTL = "ttl" + val GANGLIA_DEFAULT_TTL = 1 + + val GANGLIA_KEY_HOST = "host" + val GANGLIA_KEY_PORT = "port" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { + throw new Exception("Ganglia sink requires 'host' property.") + } + + if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { + throw new Exception("Ganglia sink requires 'port' property.") + } + + val host = propertyToOption(GANGLIA_KEY_HOST).get + val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt + val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + val mode = propertyToOption(GANGLIA_KEY_MODE) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) + val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) + .getOrElse(GANGLIA_DEFAULT_PERIOD) + val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) + .getOrElse(GANGLIA_DEFAULT_UNIT) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val ganglia = new GMetric(host, port, mode, ttl) + val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(ganglia) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} + |