From ded044c87eeb5313ec4067dc660ea92cccb4b098 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 7 Apr 2014 23:57:57 -0300 Subject: ! statsd: take advantange of the multi-measurement format --- .../scala/kamon/statsd/StatsDMetricsSender.scala | 82 +++++++++++++++++----- .../src/main/scala/kamon/statsd/Statsd.scala | 38 +--------- .../kamon/statsd/StatsdMetricTranslator.scala | 60 ---------------- 3 files changed, 68 insertions(+), 112 deletions(-) delete mode 100644 kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala (limited to 'kamon-statsd/src/main/scala/kamon/statsd') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 63b1a53a..42eb57d0 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -16,20 +16,23 @@ package kamon.statsd -import akka.actor.{ Props, ActorRef, Actor } +import akka.actor.{ActorSystem, Props, ActorRef, Actor} import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString import kamon.Kamon -import scala.annotation.tailrec +import kamon.metrics.Subscriptions.TickMetricSnapshot +import kamon.metrics.MetricSnapshot.Measurement +import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType} -class StatsDMetricsSender extends Actor { +class StatsDMetricsSender extends Actor with UdpExtensionProvider { import context.system val statsDExtension = Kamon(StatsD) val remote = new InetSocketAddress(statsDExtension.hostname, statsDExtension.port) + val metricKeyGenerator = new SimpleMetricKeyGenerator(context.system.settings.config) - IO(Udp) ! Udp.SimpleSender + udpExtension ! Udp.SimpleSender def receive = { case Udp.SimpleSenderReady ⇒ @@ -37,27 +40,74 @@ class StatsDMetricsSender extends Actor { } def ready(udpSender: ActorRef): Receive = { - case StatsD.MetricBatch(metrics) ⇒ sendMetricsToRemote(metrics, ByteString.empty, udpSender) + case tick: TickMetricSnapshot => writeMetricsToRemote(tick, udpSender) } - @tailrec final def sendMetricsToRemote(metrics: Iterable[StatsD.Metric], buffer: ByteString, udpSender: ActorRef): Unit = { - def flushToRemote(data: ByteString, udpSender: ActorRef): Unit = udpSender ! Udp.Send(data, remote) + def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { + val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote) - if (metrics.isEmpty) - flushToRemote(buffer, udpSender) - else { - val headMetricData = metrics.head.toByteString(includeTrailingNewline = true) + for((groupIdentity, groupSnapshot) <- tick.metrics; + (metricIdentity, metricSnapshot) <- groupSnapshot.metrics) { - if (buffer.size + headMetricData.size > statsDExtension.maxPacketSize) { - flushToRemote(buffer, udpSender) - sendMetricsToRemote(metrics.tail, headMetricData, udpSender) - } else { - sendMetricsToRemote(metrics.tail, buffer ++ headMetricData, udpSender) + val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity)) + + for(measurement <- metricSnapshot.measurements) { + val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) + dataBuilder.appendMeasurement(key, measurementData) } } + + dataBuilder.flush() + } + + def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): ByteString = { + def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): ByteString = + ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else "")) + + instrumentType match { + case Histogram => statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) + case Gauge => statsDMetricFormat(measurement.value.toString, "g") + case Counter => ByteString.empty // TODO: Need to decide how to report counters, when we have them! + } } } object StatsDMetricsSender { def props: Props = Props[StatsDMetricsSender] +} + +trait UdpExtensionProvider { + def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) +} + +class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: InetSocketAddress) { + val metricSeparator = ByteString("\n") + val measurementSeparator = ByteString(":") + + var lastKey= ByteString.empty + var buffer = ByteString.empty + + def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = { + val appendData = + if(key == lastKey) + measurementSeparator ++ measurementData + else { + lastKey = key + val keySeparator = if(buffer.length == 0) ByteString.empty else metricSeparator + keySeparator ++ key ++ measurementSeparator ++ measurementData + } + + if(buffer.length + appendData.length >= maxPacketSize) { + flushToUDP(buffer) + buffer = appendData + } else + buffer = buffer ++ appendData + } + + private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote) + + def flush(): Unit = { + flushToUDP(buffer) + buffer = ByteString.empty + } } \ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala index 167e993e..472824e9 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala @@ -21,7 +21,6 @@ import kamon.Kamon import kamon.metrics._ import scala.concurrent.duration._ import scala.collection.JavaConverters._ -import akka.util.ByteString import com.typesafe.config.Config import java.lang.management.ManagementFactory @@ -32,39 +31,6 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { trait MetricKeyGenerator { def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String } - - sealed trait Metric { - def key: String - def value: Double - def suffix: String - def samplingRate: Double - - /* - * Creates the stats string to send to StatsD. - * For counters, it provides something like {@code key:value|c}. - * For timing, it provides something like {@code key:millis|ms}. - * If sampling rate is less than 1, it provides something like {@code key:value|type|@rate} - */ - def toByteString(includeTrailingNewline: Boolean = true): ByteString = - if (samplingRate >= 1D) - ByteString(s"$key:$value|$suffix") - else - ByteString(s"$key:$value|$suffix|@$samplingRate") - } - - case class Counter(key: String, value: Double = 1D, samplingRate: Double = 1.0) extends Metric { - val suffix: String = "c" - } - - case class Timing(key: String, value: Double, samplingRate: Double = 1.0) extends Metric { - val suffix: String = "ms" - } - - case class Gauge(key: String, value: Double, samplingRate: Double = 1.0) extends Metric { - val suffix: String = "g" - } - - case class MetricBatch(metrics: Iterable[Metric]) } class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { @@ -86,9 +52,9 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") - val metricsTranslator = system.actorOf(StatsDMetricTranslator.props, "statsd-metrics-translator") + val metricsTranslator = system.actorOf(StatsDMetricsSender.props, "statsd-metrics-sender") if (flushInterval == tickInterval) { - // No need to buffer the metrics, let's go straight to the metrics translator. + // No need to buffer the metrics, let's go straight to the metrics sender. metricsTranslator } else { system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer") diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala deleted file mode 100644 index 2ef41c6d..00000000 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsdMetricTranslator.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.statsd - -import akka.actor.{ Props, Actor, ActorRef } -import kamon.metrics._ -import kamon.metrics.Subscriptions.TickMetricSnapshot -import kamon.metrics.ActorMetrics.ActorMetricSnapshot - -class StatsDMetricTranslator extends Actor { - val config = context.system.settings.config - - val metricKeyGenerator = new SimpleMetricKeyGenerator(config) - val metricSender = context.actorOf(StatsDMetricsSender.props, "metrics-sender") - - def receive = { - case TickMetricSnapshot(from, to, metrics) ⇒ - val translatedMetrics = metrics.collect { - case (am @ ActorMetrics(_), snapshot: ActorMetricSnapshot) ⇒ transformActorMetric(am, snapshot) - } - - metricSender ! StatsD.MetricBatch(translatedMetrics.flatten) - } - - def transformActorMetric(actorIdentity: ActorMetrics, snapshot: ActorMetricSnapshot): Vector[StatsD.Metric] = { - val timeInMailboxKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.TimeInMailbox) - val processingTimeKey = metricKeyGenerator.generateKey(actorIdentity, ActorMetrics.ProcessingTime) - - roll(timeInMailboxKey, snapshot.timeInMailbox, StatsD.Timing) ++ roll(processingTimeKey, snapshot.processingTime, StatsD.Timing) - } - - def roll(key: String, snapshot: MetricSnapshotLike, metricBuilder: (String, Double, Double) ⇒ StatsD.Metric): Vector[StatsD.Metric] = { - val builder = Vector.newBuilder[StatsD.Metric] - for (measurement ← snapshot.measurements) { - val samplingRate = 1D / measurement.count - val scaledValue = Scale.convert(snapshot.scale, Scale.Milli, measurement.value) - builder += metricBuilder.apply(key, scaledValue, samplingRate) - } - - builder.result() - } - -} - -object StatsDMetricTranslator { - def props: Props = Props[StatsDMetricTranslator] -} -- cgit v1.2.3 From d23360425cf41fd5d6a6ab6d6507d3e97bb536e1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Tue, 8 Apr 2014 02:25:47 -0300 Subject: = statsd: correctly send multiple packets for a single metric --- .../scala/kamon/statsd/StatsDMetricsSender.scala | 53 +++++++++++++--------- .../kamon/statsd/StatsDMetricSenderSpec.scala | 21 +++++++++ 2 files changed, 52 insertions(+), 22 deletions(-) (limited to 'kamon-statsd/src/main/scala/kamon/statsd') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 42eb57d0..60404d7e 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -16,14 +16,14 @@ package kamon.statsd -import akka.actor.{ActorSystem, Props, ActorRef, Actor} +import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString import kamon.Kamon import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType} +import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } class StatsDMetricsSender extends Actor with UdpExtensionProvider { import context.system @@ -40,18 +40,20 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider { } def ready(udpSender: ActorRef): Receive = { - case tick: TickMetricSnapshot => writeMetricsToRemote(tick, udpSender) + case tick: TickMetricSnapshot ⇒ writeMetricsToRemote(tick, udpSender) } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { val dataBuilder = new MetricDataPacketBuilder(statsDExtension.maxPacketSize, udpSender, remote) - for((groupIdentity, groupSnapshot) <- tick.metrics; - (metricIdentity, metricSnapshot) <- groupSnapshot.metrics) { + for ( + (groupIdentity, groupSnapshot) ← tick.metrics; + (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + ) { val key = ByteString(metricKeyGenerator.generateKey(groupIdentity, metricIdentity)) - for(measurement <- metricSnapshot.measurements) { + for (measurement ← metricSnapshot.measurements) { val measurementData = encodeMeasurement(measurement, metricSnapshot.instrumentType) dataBuilder.appendMeasurement(key, measurementData) } @@ -65,9 +67,9 @@ class StatsDMetricsSender extends Actor with UdpExtensionProvider { ByteString(value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRate else "")) instrumentType match { - case Histogram => statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge => statsDMetricFormat(measurement.value.toString, "g") - case Counter => ByteString.empty // TODO: Need to decide how to report counters, when we have them! + case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) + case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g") + case Counter ⇒ ByteString.empty // TODO: Need to decide how to report counters, when we have them! } } } @@ -84,26 +86,33 @@ class MetricDataPacketBuilder(maxPacketSize: Int, udpSender: ActorRef, remote: I val metricSeparator = ByteString("\n") val measurementSeparator = ByteString(":") - var lastKey= ByteString.empty + var lastKey = ByteString.empty var buffer = ByteString.empty def appendMeasurement(key: ByteString, measurementData: ByteString): Unit = { - val appendData = - if(key == lastKey) - measurementSeparator ++ measurementData + if (key == lastKey) { + val dataWithoutKey = measurementSeparator ++ measurementData + if (fitsOnBuffer(dataWithoutKey)) + buffer = buffer ++ dataWithoutKey else { - lastKey = key - val keySeparator = if(buffer.length == 0) ByteString.empty else metricSeparator - keySeparator ++ key ++ measurementSeparator ++ measurementData + flushToUDP(buffer) + buffer = key ++ dataWithoutKey } - - if(buffer.length + appendData.length >= maxPacketSize) { - flushToUDP(buffer) - buffer = appendData - } else - buffer = buffer ++ appendData + } else { + lastKey = key + val dataWithoutSeparator = key ++ measurementSeparator ++ measurementData + if (fitsOnBuffer(metricSeparator ++ dataWithoutSeparator)) { + val mSeparator = if (buffer.length > 0) metricSeparator else ByteString.empty + buffer = buffer ++ mSeparator ++ dataWithoutSeparator + } else { + flushToUDP(buffer) + buffer = dataWithoutSeparator + } + } } + def fitsOnBuffer(bs: ByteString): Boolean = (buffer.length + bs.length) <= maxPacketSize + private def flushToUDP(bytes: ByteString): Unit = udpSender ! Udp.Send(bytes, remote) def flush(): Unit = { diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index caeaee28..94af4645 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -25,6 +25,7 @@ import org.HdrHistogram.HdrRecorder import kamon.metrics.Subscriptions.TickMetricSnapshot import java.lang.management.ManagementFactory import com.typesafe.config.ConfigFactory +import kamon.Kamon class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers { @@ -76,6 +77,26 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers data.utf8String should be(s"$testMetricKey:10|ms|@0.5") } + "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { + val testMetricName = "test-metric" + val testMetricKey = buildMetricKey(testMetricName) + val testRecorder = HdrRecorder(1000L, 3, Scale.Unit) + + var bytes = testMetricKey.length + var level = 0 + while(bytes <= Kamon(StatsD).maxPacketSize) { + level += 1 + testRecorder.record(level) + bytes += s":$level|ms".length + } + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + udp.expectMsgType[Udp.Send] // let the first flush pass + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"$testMetricKey:$level|ms") + } + "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { val firstTestMetricName = "first-test-metric" -- cgit v1.2.3 From a830cb632e2a1b427aeef9699feb9d045419a165 Mon Sep 17 00:00:00 2001 From: Ivan Topolnak Date: Tue, 8 Apr 2014 10:52:17 -0300 Subject: = statsd: rename Statsd.scala -> StatsD.scala --- .../src/main/scala/kamon/statsd/StatsD.scala | 72 ++++++++++++++++++++++ .../src/main/scala/kamon/statsd/Statsd.scala | 72 ---------------------- 2 files changed, 72 insertions(+), 72 deletions(-) create mode 100644 kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala delete mode 100644 kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala (limited to 'kamon-statsd/src/main/scala/kamon/statsd') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala new file mode 100644 index 00000000..472824e9 --- /dev/null +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -0,0 +1,72 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.statsd + +import akka.actor._ +import kamon.Kamon +import kamon.metrics._ +import scala.concurrent.duration._ +import scala.collection.JavaConverters._ +import com.typesafe.config.Config +import java.lang.management.ManagementFactory + +object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { + override def lookup(): ExtensionId[_ <: Extension] = StatsD + override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) + + trait MetricKeyGenerator { + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String + } +} + +class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { + private val statsDConfig = system.settings.config.getConfig("kamon.statsd") + + val hostname = statsDConfig.getString("hostname") + val port = statsDConfig.getInt("port") + val flushInterval = statsDConfig.getMilliseconds("flush-interval") + val maxPacketSize = statsDConfig.getInt("max-packet-size") + val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") + + val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) + + val includedActors = statsDConfig.getStringList("includes.actor").asScala + for (actorPathPattern ← includedActors) { + Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) + } + + def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { + assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") + + val metricsTranslator = system.actorOf(StatsDMetricsSender.props, "statsd-metrics-sender") + if (flushInterval == tickInterval) { + // No need to buffer the metrics, let's go straight to the metrics sender. + metricsTranslator + } else { + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer") + } + } +} + +class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator { + val application = config.getString("kamon.statsd.simple-metric-key-generator.application") + val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) + + def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = + application + "." + localhostName + "." + groupIdentity.category.name + "." + groupIdentity.name + "." + metricIdentity.name +} + diff --git a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala b/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala deleted file mode 100644 index 472824e9..00000000 --- a/kamon-statsd/src/main/scala/kamon/statsd/Statsd.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2014 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.statsd - -import akka.actor._ -import kamon.Kamon -import kamon.metrics._ -import scala.concurrent.duration._ -import scala.collection.JavaConverters._ -import com.typesafe.config.Config -import java.lang.management.ManagementFactory - -object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { - override def lookup(): ExtensionId[_ <: Extension] = StatsD - override def createExtension(system: ExtendedActorSystem): StatsDExtension = new StatsDExtension(system) - - trait MetricKeyGenerator { - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String - } -} - -class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { - private val statsDConfig = system.settings.config.getConfig("kamon.statsd") - - val hostname = statsDConfig.getString("hostname") - val port = statsDConfig.getInt("port") - val flushInterval = statsDConfig.getMilliseconds("flush-interval") - val maxPacketSize = statsDConfig.getInt("max-packet-size") - val tickInterval = system.settings.config.getMilliseconds("kamon.metrics.tick-interval") - - val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval) - - val includedActors = statsDConfig.getStringList("includes.actor").asScala - for (actorPathPattern ← includedActors) { - Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) - } - - def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { - assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") - - val metricsTranslator = system.actorOf(StatsDMetricsSender.props, "statsd-metrics-sender") - if (flushInterval == tickInterval) { - // No need to buffer the metrics, let's go straight to the metrics sender. - metricsTranslator - } else { - system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "statsd-metrics-buffer") - } - } -} - -class SimpleMetricKeyGenerator(config: Config) extends StatsD.MetricKeyGenerator { - val application = config.getString("kamon.statsd.simple-metric-key-generator.application") - val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) - - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - application + "." + localhostName + "." + groupIdentity.category.name + "." + groupIdentity.name + "." + metricIdentity.name -} - -- cgit v1.2.3