From 16788a654246067fd966033b5dc9bc0d4c759b70 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 11 Mar 2014 11:16:59 -0700 Subject: SPARK-1167: Remove metrics-ganglia from default build due to LGPL issues... This patch removes Ganglia integration from the default build. It allows users willing to link against LGPL code to use Ganglia by adding build flags or linking against a new Spark artifact called spark-ganglia-lgpl. This brings Spark in line with the Apache policy on LGPL code enumerated here: https://www.apache.org/legal/3party.html#options-optional Author: Patrick Wendell Closes #108 from pwendell/ganglia and squashes the following commits: 326712a [Patrick Wendell] Responding to review feedback 5f28ee4 [Patrick Wendell] SPARK-1167: Remove metrics-ganglia from default build due to LGPL issues. --- extras/spark-ganglia-lgpl/pom.xml | 45 ++++++++++++ .../apache/spark/metrics/sink/GangliaSink.scala | 84 ++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 extras/spark-ganglia-lgpl/pom.xml create mode 100644 extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala (limited to 'extras') diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml new file mode 100644 index 0000000000..11ac827ed5 --- /dev/null +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.0.0-SNAPSHOT + ../../pom.xml + + + + org.apache.spark + spark-ganglia-lgpl_2.10 + jar + Spark Ganglia Integration + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + + com.codahale.metrics + metrics-ganglia + + + diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala new file mode 100644 index 0000000000..cd37317da7 --- /dev/null +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.ganglia.GangliaReporter +import info.ganglia.gmetric4j.gmetric.GMetric + +import org.apache.spark.SecurityManager +import org.apache.spark.metrics.MetricsSystem + +class GangliaSink(val property: Properties, val registry: MetricRegistry, + securityMgr: SecurityManager) extends Sink { + val GANGLIA_KEY_PERIOD = "period" + val GANGLIA_DEFAULT_PERIOD = 10 + + val GANGLIA_KEY_UNIT = "unit" + val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS + + val GANGLIA_KEY_MODE = "mode" + val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST + + // TTL for multicast messages. If listeners are X hops away in network, must be at least X. + val GANGLIA_KEY_TTL = "ttl" + val GANGLIA_DEFAULT_TTL = 1 + + val GANGLIA_KEY_HOST = "host" + val GANGLIA_KEY_PORT = "port" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { + throw new Exception("Ganglia sink requires 'host' property.") + } + + if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { + throw new Exception("Ganglia sink requires 'port' property.") + } + + val host = propertyToOption(GANGLIA_KEY_HOST).get + val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt + val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + val mode = propertyToOption(GANGLIA_KEY_MODE) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) + val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) + .getOrElse(GANGLIA_DEFAULT_PERIOD) + val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) + .getOrElse(GANGLIA_DEFAULT_UNIT) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val ganglia = new GMetric(host, port, mode, ttl) + val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(ganglia) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} + -- cgit v1.2.3