diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 01a34f67ff75419c440f2e69c0a0db888a670a34 (patch) | |
tree | 9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-datadog/src | |
parent | 4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff) | |
download | Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2 Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-datadog/src')
4 files changed, 69 insertions, 150 deletions
diff --git a/kamon-datadog/src/main/resources/reference.conf b/kamon-datadog/src/main/resources/reference.conf index de318820..4d0639c0 100644 --- a/kamon-datadog/src/main/resources/reference.conf +++ b/kamon-datadog/src/main/resources/reference.conf @@ -18,11 +18,12 @@ kamon { # Subscription patterns used to select which metrics will be pushed to Datadog. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. - includes { - actor = [ "*" ] - trace = [ "*" ] - dispatcher = [ "*" ] - router = [ "*" ] + subscriptions { + trace = [ "**" ] + actor = [ "**" ] + dispatcher = [ "**" ] + user-metric = [ "**" ] + system-metric = [ "**" ] } # Enable system metrics diff --git a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala index 596a6765..2648d6ef 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/Datadog.scala @@ -22,11 +22,8 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor._ import akka.event.Logging import kamon.Kamon -import kamon.akka.{RouterMetrics, DispatcherMetrics, ActorMetrics} -import kamon.http.HttpServerMetrics -import kamon.metric.UserMetrics.{ UserGauges, UserMinMaxCounters, UserCounters, UserHistograms } +import kamon.util.ConfigTools.Syntax import kamon.metric._ -import kamon.metrics._ import scala.collection.JavaConverters._ import scala.concurrent.duration._ @@ -34,13 +31,10 @@ import scala.concurrent.duration._ object Datadog extends ExtensionId[DatadogExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = Datadog override def createExtension(system: ExtendedActorSystem): DatadogExtension = new DatadogExtension(system) - - trait MetricKeyGenerator { - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String - } } class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { + implicit val as = system val log = Logging(system, classOf[DatadogExtension]) log.info("Starting the Kamon(Datadog) extension") @@ -53,57 +47,11 @@ class DatadogExtension(system: ExtendedActorSystem) extends Kamon.Extension { val datadogMetricsListener = buildMetricsListener(tickInterval, flushInterval) - // Subscribe to all user metrics - Kamon(Metrics)(system).subscribe(UserHistograms, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserCounters, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserGauges, "*", datadogMetricsListener, permanently = true) - - // Subscribe to server metrics - Kamon(Metrics)(system).subscribe(HttpServerMetrics.category, "*", datadogMetricsListener, permanently = true) - - // Subscribe to Actors - val includedActors = datadogConfig.getStringList("includes.actor").asScala - for (actorPathPattern ← includedActors) { - Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, datadogMetricsListener, permanently = true) - } - - // Subscribe to Routers - val includedRouters = datadogConfig.getStringList("includes.router").asScala - for (routerPathPattern ← includedRouters) { - Kamon(Metrics)(system).subscribe(RouterMetrics, routerPathPattern, datadogMetricsListener, permanently = true) - } - - // Subscribe to Traces - val includedTraces = datadogConfig.getStringList("includes.trace").asScala - for (tracePathPattern ← includedTraces) { - Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, datadogMetricsListener, permanently = true) - } - - // Subscribe to Dispatchers - val includedDispatchers = datadogConfig.getStringList("includes.dispatcher").asScala - for (dispatcherPathPattern ← includedDispatchers) { - Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, datadogMetricsListener, permanently = true) - } - - // Subscribe to SystemMetrics - val includeSystemMetrics = datadogConfig.getBoolean("report-system-metrics") - if (includeSystemMetrics) { - //OS - Kamon(Metrics)(system).subscribe(CPUMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ProcessCPUMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(MemoryMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(NetworkMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(DiskMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ContextSwitchesMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(LoadAverageMetrics, "*", datadogMetricsListener, permanently = true) - - //JVM - Kamon(Metrics)(system).subscribe(HeapMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(NonHeapMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ThreadMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ClassLoadingMetrics, "*", datadogMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(GCMetrics, "*", datadogMetricsListener, permanently = true) + val subscriptions = datadogConfig.getConfig("subscriptions") + subscriptions.firstLevelKeys.map { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + Kamon(Metrics).subscribe(subscriptionCategory, pattern, datadogMetricsListener, permanently = true) + } } def buildMetricsListener(tickInterval: Long, flushInterval: Long): ActorRef = { diff --git a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala index 195798fe..80d4f098 100644 --- a/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala +++ b/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala @@ -20,11 +20,10 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } -import kamon.metric.UserMetrics.UserMetricGroup import kamon.metric.instrument.{ Counter, Histogram } -import kamon.metric.{ MetricIdentity, MetricGroupIdentity } +import kamon.metric.{ MetricKey, Entity } import java.util.Locale class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider { @@ -68,17 +67,19 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long } case cs: Counter.Snapshot ⇒ - val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeDatadogCounter(cs.count)) - packetBuilder.appendMeasurement(key, measurementData) + if (cs.count > 0) { + val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeDatadogCounter(cs.count)) + packetBuilder.appendMeasurement(key, measurementData) + } } } packetBuilder.flush() } - def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurementData: String): String = + def formatMeasurement(entity: Entity, metricKey: MetricKey, measurementData: String): String = StringBuilder.newBuilder .append(measurementData) - .append(buildIdentificationTag(groupIdentity, metricIdentity)) + .append(buildIdentificationTag(entity, metricKey)) .result() def encodeDatadogTimer(level: Long, count: Long): String = { @@ -88,23 +89,12 @@ class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long def encodeDatadogCounter(count: Long): String = count.toString + "|c" - def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = - if (isUserMetric(groupIdentity)) - s"$appName.${groupIdentity.category.name}.${groupIdentity.name}" - else - s"$appName.${groupIdentity.category.name}.${metricIdentity.name}" - - def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = { - if (isUserMetric(groupIdentity)) "" else { - // Make the automatic HTTP trace names a bit more friendly - val normalizedEntityName = groupIdentity.name.replace(": ", ":") - s"|#${groupIdentity.category.name}:${normalizedEntityName}" - } - } + def buildMetricName(entity: Entity, metricKey: MetricKey): String = + s"$appName.${entity.category}.${metricKey.name}" - def isUserMetric(groupIdentity: MetricGroupIdentity): Boolean = groupIdentity match { - case someUserMetric: UserMetricGroup ⇒ true - case everythingElse ⇒ false + def buildIdentificationTag(entity: Entity, metricKey: MetricKey): String = { + val normalizedEntityName = entity.name.replace(": ", ":") + s"|#${entity.category}:${normalizedEntityName}" } } diff --git a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala index f910489a..1fcc0c5d 100644 --- a/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala +++ b/kamon-datadog/src/test/scala/kamon/datadog/DatadogMetricSenderSpec.scala @@ -18,13 +18,13 @@ package kamon.datadog import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ Props, ActorRef, ActorSystem } -import kamon.{ MilliTimestamp, Kamon } -import kamon.metric.instrument.Histogram.Precision -import kamon.metric.instrument.{ Counter, Histogram, HdrHistogram, LongAdderCounter } +import kamon.Kamon +import kamon.metric.instrument._ +import kamon.util.MilliTimestamp import org.scalatest.{ Matchers, WordSpecLike } import kamon.metric._ import akka.io.Udp -import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import java.lang.management.ManagementFactory import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory @@ -48,79 +48,69 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher "the DataDogMetricSender" should { "send latency measurements" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) + val (entity, testRecorder) = buildRecorder("datadog") + testRecorder.metricOne.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(entity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"kamon.actor.processing-time:10|ms|#actor:user/kamon") + data.utf8String should be(s"kamon.category.metric-one:10|ms|#category:datadog") } "include the sampling rate in case of multiple measurements of the same value" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) - testRecorder.record(10L) + val (entity, testRecorder) = buildRecorder("datadog") + testRecorder.metricTwo.record(10L, 2) - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(entity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"kamon.actor.processing-time:10|ms|@0.5|#actor:user/kamon") + data.utf8String should be(s"kamon.category.metric-two:10|ms|@0.5|#category:datadog") } "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit) + val (entity, testRecorder) = buildRecorder("datadog") var bytes = 0 var level = 0 while (bytes <= testMaxPacketSize) { level += 1 - testRecorder.record(level) - bytes += s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon".length + testRecorder.metricOne.record(level) + bytes += s"kamon.category.metric-one:$level|ms|#category:datadog".length } - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(entity -> testRecorder.collect(collectionContext))) udp.expectMsgType[Udp.Send] // let the first flush pass val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"kamon.actor.$testMetricName:$level|ms|#actor:user/kamon") + data.utf8String should be(s"kamon.category.metric-one:$level|ms|#category:datadog") } "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { - val firstTestMetricName = "processing-time-1" - val secondTestMetricName = "processing-time-2" - val thirdTestMetricName = "counter" + val (entity, testRecorder) = buildRecorder("datadog") - val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - val thirdTestRecorder = Counter() + testRecorder.metricOne.record(10L, 2) + testRecorder.metricTwo.record(21L) + testRecorder.counterOne.increment(4L) - firstTestRecorder.record(10L) - firstTestRecorder.record(10L) - - secondTestRecorder.record(21L) - - thirdTestRecorder.increment(4L) - - val udp = setup(Map( - firstTestMetricName -> firstTestRecorder.collect(collectionContext), - secondTestMetricName -> secondTestRecorder.collect(collectionContext), - thirdTestMetricName -> thirdTestRecorder.collect(collectionContext))) + val udp = setup(Map(entity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be("kamon.actor.processing-time-1:10|ms|@0.5|#actor:user/kamon\nkamon.actor.processing-time-2:21|ms|#actor:user/kamon\nkamon.actor.counter:4|c|#actor:user/kamon") + data.utf8String should be("kamon.category.metric-one:10|ms|@0.5|#category:datadog\nkamon.category.counter:4|c|#category:datadog\nkamon.category.metric-two:21|ms|#category:datadog") } + } trait UdpListenerFixture { val localhostName = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) val testMaxPacketSize = system.settings.config.getBytes("kamon.datadog.max-packet-size") - def setup(metrics: Map[String, MetricSnapshot]): TestProbe = { + def buildRecorder(name: String): (Entity, TestEntityRecorder) = { + val registration = Kamon(Metrics).register(TestEntityRecorder, name).get + (registration.entity, registration.recorder) + } + + def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = { val udp = TestProbe() val metricsSender = system.actorOf(Props(new DatadogMetricsSender(new InetSocketAddress(localhostName, 0), testMaxPacketSize) { override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref @@ -130,31 +120,21 @@ class DatadogMetricSenderSpec extends TestKitBase with WordSpecLike with Matcher udp.expectMsgType[Udp.SimpleSender] udp.reply(Udp.SimpleSenderReady) - // These names are not intented to match the real actor metrics, it's just about seeing more familiar data in tests. - val testGroupIdentity = new MetricGroupIdentity { - val name: String = "user/kamon" - val category: MetricGroupCategory = new MetricGroupCategory { - val name: String = "actor" - } - } - - val testMetrics = for ((metricName, snapshot) ← metrics) yield { - val testMetricIdentity = new MetricIdentity { - val name: String = metricName - val tag: String = "" - } - - (testMetricIdentity, snapshot) - } - - metricsSender ! TickMetricSnapshot(new MilliTimestamp(0), new MilliTimestamp(0), Map(testGroupIdentity -> new MetricGroupSnapshot { - type GroupSnapshotType = Histogram.Snapshot - def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ??? - - val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap - })) + val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics) + metricsSender ! fakeSnapshot udp } } } + +class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val metricOne = histogram("metric-one") + val metricTwo = histogram("metric-two") + val counterOne = counter("counter") +} + +object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { + def category: String = "category" + def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) +} |