aboutsummaryrefslogtreecommitdiff
path: root/kamon-datadog
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2014-06-05 19:25:58 -0300
committerDiego <diegolparra@gmail.com>2014-06-05 19:25:58 -0300
commitd211435ab2602e2fda0be4f5ea85614feec02b85 (patch)
treef07b78c30714604489e9f63cd19dc9bed8c5cfc2 /kamon-datadog
parent86aff31594c6e3e98f71903d1397b6c2709cf6a0 (diff)
downloadKamon-d211435ab2602e2fda0be4f5ea85614feec02b85.tar.gz
Kamon-d211435ab2602e2fda0be4f5ea85614feec02b85.tar.bz2
Kamon-d211435ab2602e2fda0be4f5ea85614feec02b85.zip
+ datadog: more tests and support for multiple metrics in the same package
Diffstat (limited to 'kamon-datadog')
-rw-r--r--kamon-datadog/src/main/resources/reference.conf3
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala3
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala68
-rw-r--r--kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala60
4 files changed, 100 insertions, 34 deletions
diff --git a/kamon-datadog/src/main/resources/reference.conf b/kamon-datadog/src/main/resources/reference.conf
index c7400f53..231eaf7d 100644
--- a/kamon-datadog/src/main/resources/reference.conf
+++ b/kamon-datadog/src/main/resources/reference.conf
@@ -13,6 +13,9 @@ kamon {
# kamon.metrics.tick-interval setting.
flush-interval = 1 second
+ # Max packet size for UDP metrics data sent to Datadog.
+ max-packet-size = 1024 bytes
+
# Subscription patterns used to select which metrics will be pushed to Datadog. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
includes {
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
index a323af78..15d5d3fe 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
@@ -44,6 +44,7 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val datadogHost = new InetSocketAddress(datadogConfig.getString("hostname"), datadogConfig.getInt("port"))
val flushInterval = datadogConfig.getDuration("flush-interval", MILLISECONDS)
+ val maxPacketSizeInBytes = datadogConfig.getBytes("max-packet-size")
val tickInterval = system.settings.config.getDuration("kamon.metrics.tick-interval", MILLISECONDS)
val datadogMetricsListener = buildMetricsListener(tickInterval, flushInterval)
@@ -69,7 +70,7 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension {
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
assert(flushInterval >= tickInterval, "Datadog flush-interval needs to be equal or greater to the tick-interval")
- val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost), "datadog-metrics-sender")
+ val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost, maxPacketSizeInBytes), "datadog-metrics-sender")
if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
metricsTranslator
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
index ec603b39..de4dc140 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
@@ -16,17 +16,18 @@
package kamon.datadog
-import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
-import akka.io.{ Udp, IO }
+import akka.actor.{ActorSystem, Props, ActorRef, Actor}
+import akka.io.{Udp, IO}
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.metrics.Subscriptions.TickMetricSnapshot
import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
+import kamon.metrics.InstrumentTypes.{Counter, Gauge, Histogram, InstrumentType}
import java.text.DecimalFormat
-import kamon.metrics.{ MetricIdentity, MetricGroupIdentity }
+import kamon.metrics.{MetricIdentity, MetricGroupIdentity}
+
+class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
-class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExtensionProvider {
import context.system
val appName = context.system.settings.config.getString("kamon.datadog.application-name")
@@ -45,53 +46,82 @@ class DatadogMetricsSender(remote: InetSocketAddress) extends Actor with UdpExte
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)
+ val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
- for (
+ for {
(groupIdentity, groupSnapshot) ← tick.metrics;
(metricIdentity, metricSnapshot) ← groupSnapshot.metrics
- ) {
+ } {
+
+ val key = buildMetricName(groupIdentity, metricIdentity)
for (measurement ← metricSnapshot.measurements) {
val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType)
- flushToUDP(measurementData)
+ dataBuilder.appendMeasurement(key, measurementData)
}
}
+ dataBuilder.flush()
}
def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement,
instrumentType: InstrumentType): String = {
- StringBuilder.newBuilder
- .append(buildMetricName(groupIdentity, metricIdentity))
- .append(":")
- .append(buildMeasurementData(measurement, instrumentType))
+ StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType))
.append(buildIdentificationTag(groupIdentity, metricIdentity))
.result()
}
def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = {
def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String =
- value + "|" + metricType + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
+ s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}"
instrumentType match {
case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
- case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
- case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
+ case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
+ case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
}
}
def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
- appName + "." + groupIdentity.category.name + "." + metricIdentity.name
+ s"$appName.${groupIdentity.category.name}.${metricIdentity.name}"
def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
- "|#" + groupIdentity.category.name + ":" + groupIdentity.name
+ s"|#${groupIdentity.category.name}:${groupIdentity.name}"
}
object DatadogMetricsSender {
- def props(remote: InetSocketAddress): Props = Props(new DatadogMetricsSender(remote))
+ def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new DatadogMetricsSender(remote, maxPacketSize))
}
trait UdpExtensionProvider {
def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
}
+
+class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
+ val metricSeparator = "\n"
+ val measurementSeparator = ":"
+ var lastKey = ""
+ var buffer = new StringBuilder()
+
+ def appendMeasurement(key: String, measurementData: String): Unit = {
+ val data = key + measurementSeparator + measurementData
+
+ if (fitsOnBuffer(metricSeparator + data)) {
+ val mSeparator = if (buffer.length > 0) metricSeparator else ""
+ buffer.append(mSeparator).append(data)
+ } else {
+ flushToUDP(buffer.toString())
+ buffer.clear()
+ buffer.append(data)
+ }
+ }
+
+ def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes
+
+ private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)
+
+ def flush(): Unit = {
+ flushToUDP(buffer.toString)
+ buffer.clear()
+ }
+}
diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
index c649a044..6a7191a1 100644
--- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
+++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
@@ -16,10 +16,10 @@
package kamon.datadog
-import akka.testkit.{ TestKitBase, TestProbe }
-import akka.actor.{ Props, ActorRef, ActorSystem }
+import akka.testkit.{TestKitBase, TestProbe}
+import akka.actor.{Props, ActorRef, ActorSystem}
import kamon.metrics.instruments.CounterRecorder
-import org.scalatest.{ Matchers, WordSpecLike }
+import org.scalatest.{Matchers, WordSpecLike}
import kamon.metrics._
import akka.io.Udp
import org.HdrHistogram.HdrRecorder
@@ -56,20 +56,52 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon")
}
- "send only one packet per measurement" in new UdpListenerFixture {
+ "flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
val testMetricName = "processing-time"
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
- testRecorder.record(10L)
- testRecorder.record(10L)
- testRecorder.record(20L)
+ val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit)
+
+ var bytes = 0
+ var level = 0
+
+ while (bytes <= testMaxPacketSize) {
+ level += 1
+ testRecorder.record(level)
+ bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length
+ }
val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ udp.expectMsgType[Udp.Send]// let the first flush pass
+
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+ data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon")
+ }
- val Udp.Send(data1, _, _) = udp.expectMsgType[Udp.Send]
- data1.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon")
+ "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture {
+ val firstTestMetricName = "processing-time-1"
+ val secondTestMetricName = "processing-time-2"
+ val thirdTestMetricName = "counter"
- val Udp.Send(data2, _, _) = udp.expectMsgType[Udp.Send]
- data2.utf8String should be(s"kamon.actor.processing-time:20|ms|#actor:user/kamon")
+ val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val thirdTestRecorder = CounterRecorder()
+
+ firstTestRecorder.record(10L)
+ firstTestRecorder.record(10L)
+
+ secondTestRecorder.record(21L)
+
+ thirdTestRecorder.record(1L)
+ thirdTestRecorder.record(1L)
+ thirdTestRecorder.record(1L)
+ thirdTestRecorder.record(1L)
+
+ val udp = setup(Map(
+ firstTestMetricName -> firstTestRecorder.collect(),
+ secondTestMetricName -> secondTestRecorder.collect(),
+ thirdTestMetricName -> thirdTestRecorder.collect()))
+ val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
+
+ data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon")
}
}
@@ -79,7 +111,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = {
val udp = TestProbe()
- val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0)) {
+ val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) {
override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
}))
@@ -107,8 +139,8 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot {
val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap
}))
-
udp
}
}
+
}