diff options
author | Diego <diegolparra@gmail.com> | 2014-06-05 19:25:58 -0300 |
---|---|---|
committer | Diego <diegolparra@gmail.com> | 2014-06-05 19:25:58 -0300 |
commit | d211435ab2602e2fda0be4f5ea85614feec02b85 (patch) | |
tree | f07b78c30714604489e9f63cd19dc9bed8c5cfc2 /kamon-datadog/src | |
parent | 86aff31594c6e3e98f71903d1397b6c2709cf6a0 (diff) | |
download | Kamon-d211435ab2602e2fda0be4f5ea85614feec02b85.tar.gz Kamon-d211435ab2602e2fda0be4f5ea85614feec02b85.tar.bz2 Kamon-d211435ab2602e2fda0be4f5ea85614feec02b85.zip |
+ datadog: more tests and support for multiple metrics in the same package
Diffstat (limited to 'kamon-datadog/src')
4 files changed, 100 insertions, 34 deletions
diff --git a/kamon-datadog/src/main/resources/reference.conf b/kamon-datadog/src/main/resources/reference.conf index c7400f53..231eaf7d 100644 --- a/kamon-datadog/src/main/resources/reference.conf +++ b/kamon-datadog/src/main/resources/reference.conf @@ -13,6 +13,9 @@ kamon { # kamon.metrics.tick-interval setting. flush-interval = 1 second + # Max packet size for UDP metrics data sent to Datadog. + max-packet-size = 1024 bytes + # Subscription patterns used to select which metrics will be pushed to Datadog. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. includes { diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala index a323af78..15d5d3fe 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala @@ -44,6 +44,7 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { val datadogHost = new InetSocketAddress(datadogConfig.getString("hostname"), datadogConfig.getInt("port")) val flushInterval = datadogConfig.getDuration("flush-interval", MILLISECONDS) + val maxPacketSizeInBytes = datadogConfig.getBytes("max-packet-size") val tickInterval = system.settings.config.getDuration("kamon.metrics.tick-interval", MILLISECONDS) val datadogMetricsListener = buildMetricsListener(tickInterval, flushInterval) @@ -69,7 +70,7 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { assert(flushInterval >= tickInterval, "Datadog flush-interval needs to be equal or greater to the tick-interval") - val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost), "datadog-metrics-sender") + val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost, maxPacketSizeInBytes), "datadog-metrics-sender") if (flushInterval == tickInterval) { // No need to buffer the metrics, let's go straight to the metrics sender. metricsTranslator diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala index ec603b39..de4dc140 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -16,17 +16,18 @@ package kamon.datadog -import akka.actor.{ ActorSystem, Props, ActorRef, Actor } -import akka.io.{ Udp, IO } +import akka.actor.{ActorSystem, Props, ActorRef, Actor} +import akka.io.{Udp, IO} import java.net.InetSocketAddress import akka.util.ByteString import kamon.metrics.Subscriptions.TickMetricSnapshot import kamon.metrics.MetricSnapshot.Measurement -import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType } +import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType} import java.text.DecimalFormat -import kamon.metrics.{ MetricIdentity, MetricGroupIdentity } +import kamon.metrics.{MetricIdentity, MetricGroupIdentity} + +class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { -class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExtensionProvider { import context.system val appName = context.system.settings.config.getString("kamon.datadog.application-name") @@ -45,53 +46,82 @@ class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExte } def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = { - def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) + val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) - for ( + for { (groupIdentity, groupSnapshot) ← tick.metrics; (metricIdentity, metricSnapshot) ← groupSnapshot.metrics - ) { + } { + + val key = buildMetricName(groupIdentity, metricIdentity) for (measurement ← metricSnapshot.measurements) { val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType) - flushToUDP(measurementData) + dataBuilder.appendMeasurement(key, measurementData) } } + dataBuilder.flush() } def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement, instrumentType: InstrumentType): String = { - StringBuilder.newBuilder - .append(buildMetricName(groupIdentity, metricIdentity)) - .append(":") - .append(buildMeasurementData(measurement, instrumentType)) + StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType)) .append(buildIdentificationTag(groupIdentity, metricIdentity)) .result() } def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = { def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = - value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") + s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}" instrumentType match { case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") + case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g") + case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") } } def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - appName + "." + groupIdentity.category.name + "." + metricIdentity.name + s"$appName.${groupIdentity.category.name}.${metricIdentity.name}" def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - "|#" + groupIdentity.category.name + ":" + groupIdentity.name + s"|#${groupIdentity.category.name}:${groupIdentity.name}" } object DatadogMetricsSender { - def props(remote: InetSocketAddress): Props = Props(new DatadogMetricsSender(remote)) + def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new DatadogMetricsSender(remote, maxPacketSize)) } trait UdpExtensionProvider { def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp) } + +class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) { + val metricSeparator = "\n" + val measurementSeparator = ":" + var lastKey = "" + var buffer = new StringBuilder() + + def appendMeasurement(key: String, measurementData: String): Unit = { + val data = key + measurementSeparator + measurementData + + if (fitsOnBuffer(metricSeparator + data)) { + val mSeparator = if (buffer.length > 0) metricSeparator else "" + buffer.append(mSeparator).append(data) + } else { + flushToUDP(buffer.toString()) + buffer.clear() + buffer.append(data) + } + } + + def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes + + private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote) + + def flush(): Unit = { + flushToUDP(buffer.toString) + buffer.clear() + } +} diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala index c649a044..6a7191a1 100644 --- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala +++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala @@ -16,10 +16,10 @@ package kamon.datadog -import akka.testkit.{ TestKitBase, TestProbe } -import akka.actor.{ Props, ActorRef, ActorSystem } +import akka.testkit.{TestKitBase, TestProbe} +import akka.actor.{Props, ActorRef, ActorSystem} import kamon.metrics.instruments.CounterRecorder -import org.scalatest.{ Matchers, WordSpecLike } +import org.scalatest.{Matchers, WordSpecLike} import kamon.metrics._ import akka.io.Udp import org.HdrHistogram.HdrRecorder @@ -56,20 +56,52 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon") } - "send only one packet per measurement" in new UdpListenerFixture { + "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { val testMetricName = "processing-time" - val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) - testRecorder.record(10L) - testRecorder.record(10L) - testRecorder.record(20L) + val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit) + + var bytes = 0 + var level = 0 + + while (bytes <= testMaxPacketSize) { + level += 1 + testRecorder.record(level) + bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length + } val udp = setup(Map(testMetricName -> testRecorder.collect())) + udp.expectMsgType[Udp.Send]// let the first flush pass + + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon") + } - val Udp.Send(data1, _, _) = udp.expectMsgType[Udp.Send] - data1.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon") + "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { + val firstTestMetricName = "processing-time-1" + val secondTestMetricName = "processing-time-2" + val thirdTestMetricName = "counter" - val Udp.Send(data2, _, _) = udp.expectMsgType[Udp.Send] - data2.utf8String should be(s"kamon.actor.processing-time:20|ms|#actor:user/kamon") + val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val thirdTestRecorder = CounterRecorder() + + firstTestRecorder.record(10L) + firstTestRecorder.record(10L) + + secondTestRecorder.record(21L) + + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + + val udp = setup(Map( + firstTestMetricName -> firstTestRecorder.collect(), + secondTestMetricName -> secondTestRecorder.collect(), + thirdTestMetricName -> thirdTestRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon") } } @@ -79,7 +111,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = { val udp = TestProbe() - val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0)) { + val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) { override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref })) @@ -107,8 +139,8 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot { val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap })) - udp } } + } |