From e201e684fdc7fcf7aef090757e2690c538e75e07 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Wed, 9 Apr 2014 17:30:17 -0300 Subject: + statsd: report trace metrics to StatsD --- kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala | 13 ++++++++++--- .../src/main/scala/kamon/statsd/StatsDMetricsSender.scala | 8 +++----- 2 files changed, 13 insertions(+), 8 deletions(-) (limited to 'kamon-statsd/src/main/scala/kamon/statsd') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 8b7b86db..0f1d2562 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import com.typesafe.config.Config import java.lang.management.ManagementFactory import akka.event.Logging +import java.net.InetSocketAddress object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = StatsD @@ -40,23 +41,29 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { private val statsDConfig = system.settings.config.getConfig("kamon.statsd") - val hostname = statsDConfig.getString("hostname") - val port = statsDConfig.getInt("port") + val statsDHost = new InetSocketAddress(statsDConfig.getString("hostname"), statsDConfig.getInt("port")) val flushInterval = statsDConfig.getMilliseconds("flush-interval") val maxPacketSize = statsDConfig.getInt("max-packet-size") val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) + // Subscribe to Actors val includedActors = statsDConfig.getStringList("includes.actor").asScala for (actorPathPattern ← includedActors) { Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) } + // Subscribe to Traces + val includedTraces = statsDConfig.getStringList("includes.trace").asScala + for (tracePathPattern ← includedTraces) { + Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true) + } + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") - val metricsTranslator = system.actorOf(StatsDMetricsSender.props, "statsd-metrics-sender") + val metricsTranslator = system.actorOf(StatsDMetricsSender.props(statsDHost, maxPacketSize), "statsd-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. metricsTranslator diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 0520b621..cff970b4 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -26,11 +26,9 @@ import kamon.metrics.MetricSnapshot.Measurement import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } import java.text.DecimalFormat -class StatsDMetricsSender extends Actor with UdpExtensionProvider { +class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSize: Int) extends Actor with UdpExtensionProvider { import context.system - val statsDExtension = Kamon(StatsD) - val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port) val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config) val samplingRateFormat = new DecimalFormat() samplingRateFormat.setMaximumFractionDigits(128) // Absurdly high, let the other end loss precision if it needs to. @@ -47,7 +45,7 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider { } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote) + val dataBuilder = new MetricDataPacketBuilder(maxPacketSize, udpSender, remote) for ( (groupIdentity, groupSnapshot) ← tick.metrics; @@ -78,7 +76,7 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider { } object StatsDMetricsSender { - def props: Props = Props[StatsDMetricsSender] + def props(remote: InetSocketAddress, maxPacketSize: Int): Props = Props(new StatsDMetricsSender(remote, maxPacketSize)) } trait UdpExtensionProvider { -- cgit v1.2.3