aboutsummaryrefslogtreecommitdiff
path: root/kamon-datadog
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-datadog')
-rw-r--r--kamon-datadog/src/main/resources/reference.conf4
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala16
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala54
-rw-r--r--kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala70
4 files changed, 88 insertions, 56 deletions
diff --git a/kamon-datadog/src/main/resources/reference.conf b/kamon-datadog/src/main/resources/reference.conf
index 231eaf7d..4de24526 100644
--- a/kamon-datadog/src/main/resources/reference.conf
+++ b/kamon-datadog/src/main/resources/reference.conf
@@ -24,6 +24,10 @@ kamon {
dispatcher = [ "*" ]
}
+ # Enable system metrics
+ # In order to not get a ClassNotFoundException, we must register the kamon-sytem-metrics module
+ report-system-metrics = false
+
# Application prefix for all metrics pushed to Datadog. The default namespacing scheme for metrics follows
# this pattern:
# application.entity-name.metric-name
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
index 1eb2e8a6..63557781 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
@@ -18,7 +18,9 @@ package kamon.datadog
import akka.actor._
import kamon.Kamon
+import kamon.metric._
import kamon.metrics._
+import kamon.metrics.CPUMetrics
import scala.concurrent.duration._
import scala.collection.JavaConverters._
import akka.event.Logging
@@ -64,15 +66,23 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension {
Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, datadogMetricsListener, permanently = true)
}
+ // Subscribe to SystemMetrics
+ val includeSystemMetrics = datadogConfig.getBoolean("report-system-metrics")
+ if (includeSystemMetrics) {
+ List(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics) foreach { metric ⇒
+ Kamon(Metrics)(system).subscribe(metric, "*", datadogMetricsListener, permanently = true)
+ }
+ }
+
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
assert(flushInterval >= tickInterval, "Datadog flush-interval needs to be equal or greater to the tick-interval")
- val metricsTranslator = system.actorOf(DatadogMetricsSender.props(datadogHost, maxPacketSizeInBytes), "datadog-metrics-sender")
+ val metricsSender = system.actorOf(DatadogMetricsSender.props(datadogHost, maxPacketSizeInBytes), "datadog-metrics-sender")
if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
- metricsTranslator
+ metricsSender
} else {
- system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsTranslator), "datadog-metrics-buffer")
+ system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsSender), "datadog-metrics-buffer")
}
}
}
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
index 028e9608..0f67cc34 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
@@ -20,11 +20,10 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
-import kamon.metrics.Subscriptions.TickMetricSnapshot
-import kamon.metrics.MetricSnapshot.Measurement
-import kamon.metrics.InstrumentTypes.{ Counter, Gauge, Histogram, InstrumentType }
+import kamon.metric.Subscriptions.TickMetricSnapshot
import java.text.{ DecimalFormatSymbols, DecimalFormat }
-import kamon.metrics.{ MetricIdentity, MetricGroupIdentity }
+import kamon.metric.instrument.{ Counter, Histogram }
+import kamon.metric.{ MetricIdentity, MetricGroupIdentity }
import java.util.Locale
class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
@@ -50,7 +49,7 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
}
def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
- val dataBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
+ val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)
for {
(groupIdentity, groupSnapshot) ← tick.metrics
@@ -59,38 +58,43 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
val key = buildMetricName(groupIdentity, metricIdentity)
- for (measurement ← metricSnapshot.measurements) {
- val measurementData = formatMeasurement(groupIdentity, metricIdentity, measurement, metricSnapshot.instrumentType)
- dataBuilder.appendMeasurement(key, measurementData)
+ metricSnapshot match {
+ case hs: Histogram.Snapshot ⇒
+ hs.recordsIterator.foreach { record ⇒
+ val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDTimer(record.level, record.count))
+ packetBuilder.appendMeasurement(key, measurementData)
+
+ }
+
+ case cs: Counter.Snapshot ⇒
+ val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDCounter(cs.count))
+ packetBuilder.appendMeasurement(key, measurementData)
}
}
- dataBuilder.flush()
+ packetBuilder.flush()
}
- def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurement: Measurement,
- instrumentType: InstrumentType): String = {
-
- StringBuilder.newBuilder.append(buildMeasurementData(measurement, instrumentType))
+ def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurementData: String): String =
+ StringBuilder.newBuilder
+ .append(measurementData)
.append(buildIdentificationTag(groupIdentity, metricIdentity))
.result()
- }
- def buildMeasurementData(measurement: Measurement, instrumentType: InstrumentType): String = {
- def dataDogDMetricFormat(value: String, metricType: String, samplingRate: Double = 1D): String =
- s"$value|$metricType${(if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")}"
-
- instrumentType match {
- case Histogram ⇒ dataDogDMetricFormat(measurement.value.toString, "ms", (1D / measurement.count))
- case Gauge ⇒ dataDogDMetricFormat(measurement.value.toString, "g")
- case Counter ⇒ dataDogDMetricFormat(measurement.count.toString, "c")
- }
+ def encodeStatsDTimer(level: Long, count: Long): String = {
+ val samplingRate: Double = 1D / count
+ level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
}
+ def encodeStatsDCounter(count: Long): String = count.toString + "|c"
+
def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
s"$appName.${groupIdentity.category.name}.${metricIdentity.name}"
- def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
- s"|#${groupIdentity.category.name}:${groupIdentity.name}"
+ def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = {
+ // Make the automatic HTTP trace names a bit more friendly
+ val normalizedEntityName = groupIdentity.name.replace(": ", ":")
+ s"|#${groupIdentity.category.name}:${normalizedEntityName}"
+ }
}
object DatadogMetricsSender {
diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
index 6a7191a1..713db30d 100644
--- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
+++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
@@ -16,29 +16,43 @@
package kamon.datadog
-import akka.testkit.{TestKitBase, TestProbe}
-import akka.actor.{Props, ActorRef, ActorSystem}
-import kamon.metrics.instruments.CounterRecorder
-import org.scalatest.{Matchers, WordSpecLike}
-import kamon.metrics._
+import akka.testkit.{ TestKitBase, TestProbe }
+import akka.actor.{ Props, ActorRef, ActorSystem }
+import kamon.Kamon
+import kamon.metric.instrument.Histogram.Precision
+import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter }
+import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metric._
import akka.io.Udp
-import org.HdrHistogram.HdrRecorder
-import kamon.metrics.Subscriptions.TickMetricSnapshot
+import kamon.metric.Subscriptions.TickMetricSnapshot
import java.lang.management.ManagementFactory
import java.net.InetSocketAddress
import com.typesafe.config.ConfigFactory
class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers {
- implicit lazy val system = ActorSystem("datadog-metric-sender-spec",
- ConfigFactory.parseString("kamon.datadog.max-packet-size = 256 bytes"))
+ implicit lazy val system: ActorSystem = ActorSystem("datadog-metric-sender-spec", ConfigFactory.parseString(
+ """
+ |kamon {
+ | metrics {
+ | disable-aspectj-weaver-missing-error = true
+ | }
+ |
+ | datadog {
+ | max-packet-size = 256 bytes
+ | }
+ |}
+ |
+ """.stripMargin))
+
+ val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
"the DataDogMetricSender" should {
"send latency measurements" in new UdpListenerFixture {
val testMetricName = "processing-time"
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon")
@@ -46,11 +60,11 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
"include the sampling rate in case of multiple measurements of the same value" in new UdpListenerFixture {
val testMetricName = "processing-time"
- val testRecorder = HdrRecorder(1000L, 2, Scale.Unit)
+ val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
testRecorder.record(10L)
testRecorder.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon")
@@ -58,7 +72,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
"flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
val testMetricName = "processing-time"
- val testRecorder = HdrRecorder(testMaxPacketSize, 3, Scale.Unit)
+ val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit)
var bytes = 0
var level = 0
@@ -69,8 +83,8 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length
}
- val udp = setup(Map(testMetricName -> testRecorder.collect()))
- udp.expectMsgType[Udp.Send]// let the first flush pass
+ val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ udp.expectMsgType[Udp.Send] // let the first flush pass
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon")
@@ -81,24 +95,21 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
val secondTestMetricName = "processing-time-2"
val thirdTestMetricName = "counter"
- val firstTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
- val secondTestRecorder = HdrRecorder(1000L, 2, Scale.Unit)
- val thirdTestRecorder = CounterRecorder()
+ val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
+ val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
+ val thirdTestRecorder = Counter()
firstTestRecorder.record(10L)
firstTestRecorder.record(10L)
secondTestRecorder.record(21L)
- thirdTestRecorder.record(1L)
- thirdTestRecorder.record(1L)
- thirdTestRecorder.record(1L)
- thirdTestRecorder.record(1L)
+ thirdTestRecorder.increment(4L)
val udp = setup(Map(
- firstTestMetricName -> firstTestRecorder.collect(),
- secondTestMetricName -> secondTestRecorder.collect(),
- thirdTestMetricName -> thirdTestRecorder.collect()))
+ firstTestMetricName -> firstTestRecorder.collect(collectionContext),
+ secondTestMetricName -> secondTestRecorder.collect(collectionContext),
+ thirdTestMetricName -> thirdTestRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon")
@@ -109,7 +120,7 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
val testMaxPacketSize = system.settings.config.getBytes("kamon.datadog.max-packet-size")
- def setup(metrics: Map[String, MetricSnapshotLike]): TestProbe = {
+ def setup(metrics: Map[String, MetricSnapshot]): TestProbe = {
val udp = TestProbe()
val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) {
override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
@@ -137,7 +148,10 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
}
metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot {
- val metrics: Map[MetricIdentity, MetricSnapshotLike] = testMetrics.toMap
+ type GroupSnapshotType = Histogram.Snapshot
+ def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ???
+
+ val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap
}))
udp
}