From 002ced85cf207b2aefbcbc496cc6787ef7b844cd Mon Sep 17 00:00:00 2001 From: Diego Date: Fri, 9 May 2014 20:36:45 -0300 Subject: + datadog: added counter instument and test --- .../scala/kamon/datadog/DatadogMetricsSender.scala | 9 +- .../kamon/datadog/DatadogMetricSenderSpec.scala | 139 ++++++++++++++++++++- project/Projects.scala | 2 +- 3 files changed, 144 insertions(+), 6 deletions(-) diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala index 9d7b34e6..50911093 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -62,14 +62,15 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long dataBuilder.flush() } + def encodeMeasurement(measurement: Measurement, instrumentType: InstrumentType): String = { - def statsDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = + def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String = value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "") instrumentType match { - case Histogram ⇒ statsDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) - case Gauge ⇒ statsDMetricFormat(measurement.value.toString, "g") - case Counter ⇒ "" // TODO: Need to decide how to report counters, when we have them! + case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count)) + case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g") + case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c") } } } diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala index cd41a112..781a73c5 100644 --- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala +++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala @@ -17,7 +17,8 @@ package kamon.datadog import akka.testkit.{ TestKitBase, TestProbe } -import akka.actor.{ ActorRef, Props, ActorSystem } +import akka.actor.{Props, ActorRef, ActorSystem} +import kamon.metrics.instruments.CounterRecorder import org.scalatest.{ Matchers, WordSpecLike } import kamon.metrics._ import akka.io.Udp @@ -28,4 +29,140 @@ import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers { + implicit lazy val system = ActorSystem("datadog-metric-sender-spec", + ConfigFactory.parseString("kamon.datadog.max-packet-size = 256 bytes")) + + "the DataDogMetricSender" should { + "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { + val testMetricName = "test-metric" + val testMetricKey = buildMetricKey(testMetricName) + val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + testRecorder.record(10L) + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"$testMetricKey:10|ms") + } + + "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture { + val testMetricName = "test-metric" + val testMetricKey = buildMetricKey(testMetricName) + val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + testRecorder.record(10L) + testRecorder.record(11L) + testRecorder.record(12L) + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms") + } + + "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture { + val testMetricName = "test-metric" + val testMetricKey = buildMetricKey(testMetricName) + val testRecorder = HdrRecorder(1000L, 2, Scale.Unit) + testRecorder.record(10L) + testRecorder.record(10L) + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"$testMetricKey:10|ms|@0.5") + } + + "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { + val testMetricName = "test-metric" + val testMetricKey = buildMetricKey(testMetricName) + val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit) + + var bytes = testMetricKey.length + var level = 0 + while (bytes <= testMaxPacketSize) { + level += 1 + testRecorder.record(level) + bytes += s":$level|ms".length + } + + val udp = setup(Map(testMetricName -> testRecorder.collect())) + udp.expectMsgType[Udp.Send] // let the first flush pass + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"$testMetricKey:$level|ms") + } + + "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { + val firstTestMetricName = "first-test-metric" + val firstTestMetricKey = buildMetricKey(firstTestMetricName) + val secondTestMetricName = "second-test-metric" + val secondTestMetricKey = buildMetricKey(secondTestMetricName) + val thirdTestMetricName = "third-test-metric" + val thirdTestMetricKey = buildMetricKey(thirdTestMetricName) + + val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit) + val thirdTestRecorder = CounterRecorder() + + firstTestRecorder.record(10L) + firstTestRecorder.record(10L) + firstTestRecorder.record(11L) + + secondTestRecorder.record(20L) + secondTestRecorder.record(21L) + + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + thirdTestRecorder.record(1L) + + val udp = setup(Map( + firstTestMetricName -> firstTestRecorder.collect(), + secondTestMetricName -> secondTestRecorder.collect(), + thirdTestMetricName -> thirdTestRecorder.collect())) + val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] + + data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms\n$thirdTestMetricKey:4|c") + } + } + + trait UdpListenerFixture { + val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) + val testMaxPacketSize = system.settings.config.getBytes("kamon.datadog.max-packet-size") + + def buildMetricKey(metricName: String): String = s"kamon.$localhostName.test-metric-category.test-group.$metricName" + + def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = { + val udp = TestProbe() + val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) { + override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref + })) + + // Setup the SimpleSender + udp.expectMsgType[Udp.SimpleSender] + udp.reply(Udp.SimpleSenderReady) + + val testGroupIdentity = new MetricGroupIdentity { + val name: String = "test-group" + val category: MetricGroupCategory = new MetricGroupCategory { + val name: String = "test-metric-category" + } + } + + val testMetrics = for ((metricName, snapshot) ← metrics) yield { + val testMetricIdentity = new MetricIdentity { + val name: String = metricName + val tag: String = "" + } + + (testMetricIdentity, snapshot) + } + + metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot { + val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap + })) + + udp + } + } } diff --git a/project/Projects.scala b/project/Projects.scala index 2f96fa47..f51cada6 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -54,7 +54,7 @@ object Projects extends Build { .settings( libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayCan, sprayClient, sprayRouting, logback)) - .dependsOn(kamonSpray, kamonNewrelic, kamonStatsd) + .dependsOn(kamonSpray, kamonNewrelic, kamonStatsd, kamonDatadog) lazy val kamonDashboard = Project("kamon-dashboard", file("kamon-dashboard")) -- cgit v1.2.3