aboutsummaryrefslogtreecommitdiff
path: root/kamon-datadog/src
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-01-12 01:45:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-01-24 23:19:01 +0100
commit01a34f67ff75419c440f2e69c0a0db888a670a34 (patch)
tree9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-datadog/src
parent4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff)
downloadKamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2
Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-datadog/src')
-rw-r--r--kamon-datadog/src/main/resources/reference.conf11
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala66
-rw-r--r--kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala36
-rw-r--r--kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala106
4 files changed, 69 insertions, 150 deletions
diff --git a/kamon-datadog/src/main/resources/reference.conf b/kamon-datadog/src/main/resources/reference.conf
index de318820..4d0639c0 100644
--- a/kamon-datadog/src/main/resources/reference.conf
+++ b/kamon-datadog/src/main/resources/reference.conf
@@ -18,11 +18,12 @@ kamon {
# Subscription patterns used to select which metrics will be pushed to Datadog. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
- includes {
- actor = [ "*" ]
- trace = [ "*" ]
- dispatcher = [ "*" ]
- router = [ "*" ]
+ subscriptions {
+ trace = [ "**" ]
+ actor = [ "**" ]
+ dispatcher = [ "**" ]
+ user-metric = [ "**" ]
+ system-metric = [ "**" ]
}
# Enable system metrics
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
index 596a6765..2648d6ef 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala
@@ -22,11 +22,8 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
import akka.event.Logging
import kamon.Kamon
-import kamon.akka.{RouterMetrics, DispatcherMetrics, ActorMetrics}
-import kamon.http.HttpServerMetrics
-import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms }
+import kamon.util.ConfigTools.Syntax
import kamon.metric._
-import kamon.metrics._
import scala.collection.JavaConverters._
import scala.concurrent.duration._
@@ -34,13 +31,10 @@ import scala.concurrent.duration._
object Datadog extends ExtensionId[DatadogExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = Datadog
override def createExtension(system: ExtendedActorSystem): DatadogExtension = new DatadogExtension(system)
-
- trait MetricKeyGenerator {
- def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String
- }
}
class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension {
+ implicit val as = system
val log = Logging(system, classOf[DatadogExtension])
log.info("Starting the Kamon(Datadog) extension")
@@ -53,57 +47,11 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension {
val datadogMetricsListener = buildMetricsListener(tickInterval, flushInterval)
- // Subscribe to all user metrics
- Kamon(Metrics)(system).subscribe(UserHistograms, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserCounters, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(UserGauges, "*", datadogMetricsListener, permanently = true)
-
- // Subscribe to server metrics
- Kamon(Metrics)(system).subscribe(HttpServerMetrics.category, "*", datadogMetricsListener, permanently = true)
-
- // Subscribe to Actors
- val includedActors = datadogConfig.getStringList("includes.actor").asScala
- for (actorPathPattern ← includedActors) {
- Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, datadogMetricsListener, permanently = true)
- }
-
- // Subscribe to Routers
- val includedRouters = datadogConfig.getStringList("includes.router").asScala
- for (routerPathPattern ← includedRouters) {
- Kamon(Metrics)(system).subscribe(RouterMetrics, routerPathPattern, datadogMetricsListener, permanently = true)
- }
-
- // Subscribe to Traces
- val includedTraces = datadogConfig.getStringList("includes.trace").asScala
- for (tracePathPattern ← includedTraces) {
- Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, datadogMetricsListener, permanently = true)
- }
-
- // Subscribe to Dispatchers
- val includedDispatchers = datadogConfig.getStringList("includes.dispatcher").asScala
- for (dispatcherPathPattern ← includedDispatchers) {
- Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, datadogMetricsListener, permanently = true)
- }
-
- // Subscribe to SystemMetrics
- val includeSystemMetrics = datadogConfig.getBoolean("report-system-metrics")
- if (includeSystemMetrics) {
- //OS
- Kamon(Metrics)(system).subscribe(CPUMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ProcessCPUMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(MemoryMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(NetworkMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(DiskMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ContextSwitchesMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(LoadAverageMetrics, "*", datadogMetricsListener, permanently = true)
-
- //JVM
- Kamon(Metrics)(system).subscribe(HeapMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(NonHeapMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ThreadMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(ClassLoadingMetrics, "*", datadogMetricsListener, permanently = true)
- Kamon(Metrics)(system).subscribe(GCMetrics, "*", datadogMetricsListener, permanently = true)
+ val subscriptions = datadogConfig.getConfig("subscriptions")
+ subscriptions.firstLevelKeys.map { subscriptionCategory ⇒
+ subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒
+ Kamon(Metrics).subscribe(subscriptionCategory, pattern, datadogMetricsListener, permanently = true)
+ }
}
def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = {
diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
index 195798fe..80d4f098 100644
--- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
+++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
@@ -20,11 +20,10 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
-import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import java.text.{ DecimalFormatSymbols, DecimalFormat }
-import kamon.metric.UserMetrics.UserMetricGroup
import kamon.metric.instrument.{ Counter, Histogram }
-import kamon.metric.{ MetricIdentity, MetricGroupIdentity }
+import kamon.metric.{ MetricKey, Entity }
import java.util.Locale
class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {
@@ -68,17 +67,19 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
}
case cs: Counter.Snapshot ⇒
- val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeDatadogCounter(cs.count))
- packetBuilder.appendMeasurement(key, measurementData)
+ if (cs.count > 0) {
+ val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeDatadogCounter(cs.count))
+ packetBuilder.appendMeasurement(key, measurementData)
+ }
}
}
packetBuilder.flush()
}
- def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurementData: String): String =
+ def formatMeasurement(entity: Entity, metricKey: MetricKey, measurementData: String): String =
StringBuilder.newBuilder
.append(measurementData)
- .append(buildIdentificationTag(groupIdentity, metricIdentity))
+ .append(buildIdentificationTag(entity, metricKey))
.result()
def encodeDatadogTimer(level: Long, count: Long): String = {
@@ -88,23 +89,12 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long
def encodeDatadogCounter(count: Long): String = count.toString + "|c"
- def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
- if (isUserMetric(groupIdentity))
- s"$appName.${groupIdentity.category.name}.${groupIdentity.name}"
- else
- s"$appName.${groupIdentity.category.name}.${metricIdentity.name}"
-
- def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = {
- if (isUserMetric(groupIdentity)) "" else {
- // Make the automatic HTTP trace names a bit more friendly
- val normalizedEntityName = groupIdentity.name.replace(": ", ":")
- s"|#${groupIdentity.category.name}:${normalizedEntityName}"
- }
- }
+ def buildMetricName(entity: Entity, metricKey: MetricKey): String =
+ s"$appName.${entity.category}.${metricKey.name}"
- def isUserMetric(groupIdentity: MetricGroupIdentity): Boolean = groupIdentity match {
- case someUserMetric: UserMetricGroup ⇒ true
- case everythingElse ⇒ false
+ def buildIdentificationTag(entity: Entity, metricKey: MetricKey): String = {
+ val normalizedEntityName = entity.name.replace(": ", ":")
+ s"|#${entity.category}:${normalizedEntityName}"
}
}
diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
index f910489a..1fcc0c5d 100644
--- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
+++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala
@@ -18,13 +18,13 @@ package kamon.datadog
import akka.testkit.{ TestKitBase, TestProbe }
import akka.actor.{ Props, ActorRef, ActorSystem }
-import kamon.{ MilliTimestamp, Kamon }
-import kamon.metric.instrument.Histogram.Precision
-import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter }
+import kamon.Kamon
+import kamon.metric.instrument._
+import kamon.util.MilliTimestamp
import org.scalatest.{ Matchers, WordSpecLike }
import kamon.metric._
import akka.io.Udp
-import kamon.metric.Subscriptions.TickMetricSnapshot
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import java.lang.management.ManagementFactory
import java.net.InetSocketAddress
import com.typesafe.config.ConfigFactory
@@ -48,79 +48,69 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
"the DataDogMetricSender" should {
"send latency measurements" in new UdpListenerFixture {
- val testMetricName = "processing-time"
- val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- testRecorder.record(10L)
+ val (entity, testRecorder) = buildRecorder("datadog")
+ testRecorder.metricOne.record(10L)
- val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ val udp = setup(Map(entity -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
- data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon")
+ data.utf8String should be(s"kamon.category.metric-one:10|ms|#category:datadog")
}
"include the sampling rate in case of multiple measurements of the same value" in new UdpListenerFixture {
- val testMetricName = "processing-time"
- val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- testRecorder.record(10L)
- testRecorder.record(10L)
+ val (entity, testRecorder) = buildRecorder("datadog")
+ testRecorder.metricTwo.record(10L, 2)
- val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ val udp = setup(Map(entity -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
- data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon")
+ data.utf8String should be(s"kamon.category.metric-two:10|ms|@0.5|#category:datadog")
}
"flush the packet when the max-packet-size is reached" in new UdpListenerFixture {
- val testMetricName = "processing-time"
- val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit)
+ val (entity, testRecorder) = buildRecorder("datadog")
var bytes = 0
var level = 0
while (bytes <= testMaxPacketSize) {
level += 1
- testRecorder.record(level)
- bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length
+ testRecorder.metricOne.record(level)
+ bytes += s"kamon.category.metric-one:$level|ms|#category:datadog".length
}
- val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext)))
+ val udp = setup(Map(entity -> testRecorder.collect(collectionContext)))
udp.expectMsgType[Udp.Send] // let the first flush pass
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
- data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon")
+ data.utf8String should be(s"kamon.category.metric-one:$level|ms|#category:datadog")
}
"render multiple keys in the same packet using newline as separator" in new UdpListenerFixture {
- val firstTestMetricName = "processing-time-1"
- val secondTestMetricName = "processing-time-2"
- val thirdTestMetricName = "counter"
+ val (entity, testRecorder) = buildRecorder("datadog")
- val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit)
- val thirdTestRecorder = Counter()
+ testRecorder.metricOne.record(10L, 2)
+ testRecorder.metricTwo.record(21L)
+ testRecorder.counterOne.increment(4L)
- firstTestRecorder.record(10L)
- firstTestRecorder.record(10L)
-
- secondTestRecorder.record(21L)
-
- thirdTestRecorder.increment(4L)
-
- val udp = setup(Map(
- firstTestMetricName -> firstTestRecorder.collect(collectionContext),
- secondTestMetricName -> secondTestRecorder.collect(collectionContext),
- thirdTestMetricName -> thirdTestRecorder.collect(collectionContext)))
+ val udp = setup(Map(entity -> testRecorder.collect(collectionContext)))
val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send]
- data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon")
+ data.utf8String should be("kamon.category.metric-one:10|ms|@0.5|#category:datadog\nkamon.category.counter:4|c|#category:datadog\nkamon.category.metric-two:21|ms|#category:datadog")
}
+
}
trait UdpListenerFixture {
val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1)
val testMaxPacketSize = system.settings.config.getBytes("kamon.datadog.max-packet-size")
- def setup(metrics: Map[String, MetricSnapshot]): TestProbe = {
+ def buildRecorder(name: String): (Entity, TestEntityRecorder) = {
+ val registration = Kamon(Metrics).register(TestEntityRecorder, name).get
+ (registration.entity, registration.recorder)
+ }
+
+ def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = {
val udp = TestProbe()
val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) {
override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref
@@ -130,31 +120,21 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher
udp.expectMsgType[Udp.SimpleSender]
udp.reply(Udp.SimpleSenderReady)
- // These names are not intented to match the real actor metrics, it's just about seeing more familiar data in tests.
- val testGroupIdentity = new MetricGroupIdentity {
- val name: String = "user/kamon"
- val category: MetricGroupCategory = new MetricGroupCategory {
- val name: String = "actor"
- }
- }
-
- val testMetrics = for ((metricName, snapshot) ← metrics) yield {
- val testMetricIdentity = new MetricIdentity {
- val name: String = metricName
- val tag: String = ""
- }
-
- (testMetricIdentity, snapshot)
- }
-
- metricsSender ! TickMetricSnapshot(new MilliTimestamp(0), new MilliTimestamp(0), Map(testGroupIdentity -> new MetricGroupSnapshot {
- type GroupSnapshotType = Histogram.Snapshot
- def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ???
-
- val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap
- }))
+ val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics)
+ metricsSender ! fakeSnapshot
udp
}
}
}
+
+class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ val metricOne = histogram("metric-one")
+ val metricTwo = histogram("metric-two")
+ val counterOne = counter("counter")
+}
+
+object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] {
+ def category: String = "category"
+ def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory)
+}