diff options
Diffstat (limited to 'kamon-statsd/src/main')
3 files changed, 56 insertions, 27 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf index fd6293d9..eac5eade 100644 --- a/kamon-statsd/src/main/resources/reference.conf +++ b/kamon-statsd/src/main/resources/reference.conf @@ -24,6 +24,10 @@ kamon { dispatcher = [ "*" ] } + # Enable system metrics + # In order to not get a ClassNotFoundException, we must register the kamon-sytem-metrics module + report-system-metrics = false + simple-metric-key-generator { # Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows # this pattern: diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index f10406ed..386b8f92 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -18,6 +18,7 @@ package kamon.statsd import akka.actor._ import kamon.Kamon +import kamon.metric._ import kamon.metrics._ import scala.concurrent.duration._ import scala.collection.JavaConverters._ @@ -31,6 +32,8 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) trait MetricKeyGenerator { + def localhostName: String + def normalizedLocalhostName: String def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String } } @@ -43,7 +46,7 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val statsDHost = new InetSocketAddress(statsDConfig.getString("hostname"), statsDConfig.getInt("port")) val flushInterval = statsDConfig.getMilliseconds("flush-interval") - val maxPacketSize = statsDConfig.getInt("max-packet-size") + val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size") val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) @@ -66,24 +69,44 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true) } + // Subscribe to SystemMetrics + val includeSystemMetrics = statsDConfig.getBoolean("report-system-metrics") + if (includeSystemMetrics) { + List(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics) foreach { metric ⇒ + Kamon(Metrics)(system).subscribe(metric, "*", statsDMetricsListener, permanently = true) + } + } + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") + val defaultMetricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) + + val metricsSender = system.actorOf(StatsDMetricsSender.props( + statsDHost, + maxPacketSizeInBytes, + defaultMetricKeyGenerator), "statsd-metrics-sender") - val metricsTranslator = system.actorOf(StatsDMetricsSender.props(statsDHost, maxPacketSize), "statsd-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. - metricsTranslator + metricsSender } else { - system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer") + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsSender), "statsd-metrics-buffer") } } } class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator { val application = config.getString("kamon.statsd.simple-metric-key-generator.application") - val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) + val _localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) + val _normalizedLocalhostName = _localhostName.replace('.', '_') - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - s"${application}.${localhostName}.${groupIdentity.category.name}.${groupIdentity.name}.${metricIdentity.name}" + def localhostName: String = _localhostName + + def normalizedLocalhostName: String = _normalizedLocalhostName + + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = { + val normalizedGroupName = groupIdentity.name.replace(": ", "-").replace(" ", "_").replace("/", "_") + s"${application}.${normalizedLocalhostName}.${groupIdentity.category.name}.${normalizedGroupName}.${metricIdentity.name}" + } } diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index adda18cc..8fbf4fee 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,16 +20,16 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } +import kamon.metric.Subscriptions.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } import java.util.Locale -class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { +import kamon.metric.instrument.{ Counter, Histogram } + +class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long, metricKeyGenerator: StatsD.MetricKeyGenerator) + extends Actor with UdpExtensionProvider { import context.system - val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config) val symbols = DecimalFormatSymbols.getInstance(Locale.US) symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of. @@ -48,7 +48,7 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) + val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( (groupIdentity, groupSnapshot) ← tick.metrics; @@ -57,29 +57,31 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) - for (measurement ← metricSnapshot.measurements) { - val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) - dataBuilder.appendMeasurement(key, measurementData) + metricSnapshot match { + case hs: Histogram.Snapshot ⇒ + hs.recordsIterator.foreach { record ⇒ + packetBuilder.appendMeasurement(key, encodeStatsDTimer(record.level, record.count)) + } + + case cs: Counter.Snapshot ⇒ + packetBuilder.appendMeasurement(key, encodeStatsDCounter(cs.count)) } } - dataBuilder.flush() + packetBuilder.flush() } - def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = { - def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = - value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") - - instrumentType match { - case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ statsDMetricFormat(measurement.count.toString, "c") - } + def encodeStatsDTimer(level: Long, count: Long): String = { + val samplingRate: Double = 1D / count + level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") } + + def encodeStatsDCounter(count: Long): String = count.toString + "|c" } object StatsDMetricsSender { - def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new StatsDMetricsSender(remote, maxPacketSize)) + def props(remote: InetSocketAddress, maxPacketSize: Long, metricKeyGenerator: StatsD.MetricKeyGenerator): Props = + Props(new StatsDMetricsSender(remote, maxPacketSize, metricKeyGenerator)) } trait UdpExtensionProvider { |