diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 01a34f67ff75419c440f2e69c0a0db888a670a34 (patch) | |
tree | 9c4dee4e9c13c26937356950f9e4927c3f9dfb7d /kamon-statsd/src | |
parent | 4a47e92d23af371f1d50b40af6cbe00a5ffc0105 (diff) | |
download | Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.gz Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.tar.bz2 Kamon-01a34f67ff75419c440f2e69c0a0db888a670a34.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-statsd/src')
6 files changed, 83 insertions, 164 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf index a10ac735..32b99353 100644 --- a/kamon-statsd/src/main/resources/reference.conf +++ b/kamon-statsd/src/main/resources/reference.conf @@ -19,11 +19,12 @@ kamon { # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. - includes { - actor = [ "*" ] - trace = [ "*" ] - dispatcher = [ "*" ] - router = [ "*" ] + subscriptions { + trace = [ "**" ] + actor = [ "**" ] + dispatcher = [ "**" ] + user-metric = [ "**" ] + system-metric = [ "**" ] } # Enable system metrics diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala index 28354423..0fce855c 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala @@ -3,11 +3,10 @@ package kamon.statsd import java.lang.management.ManagementFactory import com.typesafe.config.Config -import kamon.metric.UserMetrics.UserMetricGroup -import kamon.metric.{ MetricIdentity, MetricGroupIdentity } +import kamon.metric.{ MetricKey, Entity } trait MetricKeyGenerator { - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String + def generateKey(entity: Entity, metricKey: MetricKey): String } class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator { @@ -27,16 +26,11 @@ class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator { if (includeHostname) s"$application.$normalizedHostname" else application - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = { - val normalizedGroupName = normalizer(groupIdentity.name) - val key = s"${baseName}.${groupIdentity.category.name}.${normalizedGroupName}" - - if (isUserMetric(groupIdentity)) key - else s"${key}.${metricIdentity.name}" + def generateKey(entity: Entity, metricKey: MetricKey): String = { + val normalizedGroupName = normalizer(entity.name) + s"${baseName}.${entity.category}.${normalizedGroupName}.${metricKey.name}" } - def isUserMetric(groupIdentity: MetricGroupIdentity): Boolean = groupIdentity.isInstanceOf[UserMetricGroup] - def hostName: String = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) def createNormalizer(strategy: String): Normalizer = strategy match { diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 2505f06a..e5a15a9d 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -18,17 +18,14 @@ package kamon.statsd import akka.actor._ import kamon.Kamon -import kamon.akka.{RouterMetrics, DispatcherMetrics, ActorMetrics} -import kamon.http.HttpServerMetrics -import kamon.metric.UserMetrics._ import kamon.metric._ -import kamon.metrics._ +import kamon.util.ConfigTools.Syntax import scala.concurrent.duration._ -import scala.collection.JavaConverters._ import com.typesafe.config.Config import akka.event.Logging import java.net.InetSocketAddress import java.util.concurrent.TimeUnit.MILLISECONDS +import scala.collection.JavaConverters._ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = StatsD @@ -36,6 +33,8 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { } class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { + implicit val as = system + val log = Logging(system, classOf[StatsDExtension]) log.info("Starting the Kamon(StatsD) extension") @@ -50,57 +49,11 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config) - // Subscribe to all user metrics - Kamon(Metrics)(system).subscribe(UserHistograms, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserCounters, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserGauges, "*", statsDMetricsListener, permanently = true) - - // Subscribe to server metrics - Kamon(Metrics)(system).subscribe(HttpServerMetrics.category, "*", statsDMetricsListener, permanently = true) - - // Subscribe to Actors - val includedActors = statsDConfig.getStringList("includes.actor").asScala - for (actorPathPattern ← includedActors) { - Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Routers - val includedRouters = statsDConfig.getStringList("includes.router").asScala - for (routerPathPattern ← includedRouters) { - Kamon(Metrics)(system).subscribe(RouterMetrics, routerPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Traces - val includedTraces = statsDConfig.getStringList("includes.trace").asScala - for (tracePathPattern ← includedTraces) { - Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Dispatchers - val includedDispatchers = statsDConfig.getStringList("includes.dispatcher").asScala - for (dispatcherPathPattern ← includedDispatchers) { - Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to SystemMetrics - val includeSystemMetrics = statsDConfig.getBoolean("report-system-metrics") - if (includeSystemMetrics) { - //OS - Kamon(Metrics)(system).subscribe(CPUMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ProcessCPUMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(MemoryMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(NetworkMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(DiskMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ContextSwitchesMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(LoadAverageMetrics, "*", statsDMetricsListener, permanently = true) - - //JVM - Kamon(Metrics)(system).subscribe(HeapMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(NonHeapMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ThreadMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ClassLoadingMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(GCMetrics, "*", statsDMetricsListener, permanently = true) + val subscriptions = statsDConfig.getConfig("subscriptions") + subscriptions.firstLevelKeys.map { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + Kamon(Metrics).subscribe(subscriptionCategory, pattern, statsDMetricsListener, permanently = true) + } } def buildMetricsListener(tickInterval: Long, flushInterval: Long, keyGeneratorFQCN: String, config: Config): ActorRef = { diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 2aac3a52..3241e1f3 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,7 +20,7 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } import java.util.Locale @@ -51,11 +51,11 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long, val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( - (groupIdentity, groupSnapshot) ← tick.metrics; - (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + (entity, snapshot) ← tick.metrics; + (metricKey, metricSnapshot) ← snapshot.metrics ) { - val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) + val key = metricKeyGenerator.generateKey(entity, metricKey) metricSnapshot match { case hs: Histogram.Snapshot ⇒ diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala index ed3fae5b..0edeb3df 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala @@ -1,7 +1,8 @@ package kamon.statsd import com.typesafe.config.ConfigFactory -import kamon.metric.{ MetricGroupCategory, MetricGroupIdentity, MetricIdentity } +import kamon.metric.instrument.UnitOfMeasurement +import kamon.metric._ import org.scalatest.{ Matchers, WordSpec } class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers { @@ -68,13 +69,8 @@ class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers { } def buildMetricKey(categoryName: String, entityName: String, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { - val metricIdentity = new MetricIdentity { val name: String = metricName } - val groupIdentity = new MetricGroupIdentity { - val name: String = entityName - val category: MetricGroupCategory = new MetricGroupCategory { - val name: String = categoryName - } - } - metricKeyGenerator.generateKey(groupIdentity, metricIdentity) + val metric = HistogramKey(metricName, UnitOfMeasurement.Unknown, Map.empty) + val entity = Entity(entityName, categoryName) + metricKeyGenerator.generateKey(entity, metric) } } diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index 6c77f321..a0d787d9 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -18,14 +18,13 @@ package kamon.statsd import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ ActorRef, Props, ActorSystem } -import kamon.{ MilliTimestamp, Kamon } -import kamon.metric.instrument.Histogram.Precision -import kamon.metric.instrument.Histogram +import kamon.Kamon +import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } +import kamon.util.MilliTimestamp import org.scalatest.{ Matchers, WordSpecLike } import kamon.metric._ import akka.io.Udp -import kamon.metric.Subscriptions.TickMetricSnapshot -import java.lang.management.ManagementFactory +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory @@ -33,10 +32,6 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers implicit lazy val system: ActorSystem = ActorSystem("statsd-metric-sender-spec", ConfigFactory.parseString( """ |kamon { - | metrics { - | disable-aspectj-weaver-missing-error = true - | } - | | statsd.simple-metric-key-generator { | application = kamon | hostname-override = kamon-host @@ -56,58 +51,54 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers "the StatsDMetricSender" should { "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms") } "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) - testRecorder.record(11L) - testRecorder.record(12L) - - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(11L) + testRecorder.metricOne.record(12L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms") } "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) - testRecorder.record(10L) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms|@0.5") } "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") var bytes = testMetricKey.length var level = 0 while (bytes <= testMaxPacketSize) { level += 1 - testRecorder.record(level) + testRecorder.metricOne.record(level) bytes += s":$level|ms".length } - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) udp.expectMsgType[Udp.Send] // let the first flush pass val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] @@ -115,51 +106,38 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers } "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { - val firstTestMetricName = "first-test-metric" - val firstTestMetricKey = buildMetricKey("actor", "/user/kamon", firstTestMetricName) - val secondTestMetricName = "second-test-metric" - val secondTestMetricKey = buildMetricKey("actor", "/user/kamon", secondTestMetricName) + val testMetricKey1 = buildMetricKey(testEntity, "metric-one") + val testMetricKey2 = buildMetricKey(testEntity, "metric-two") + val testRecorder = buildRecorder("user/kamon") - val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(11L) - firstTestRecorder.record(10L) - firstTestRecorder.record(10L) - firstTestRecorder.record(11L) + testRecorder.metricTwo.record(20L) + testRecorder.metricTwo.record(21L) - secondTestRecorder.record(20L) - secondTestRecorder.record(21L) - - val udp = setup(Map( - firstTestMetricName -> firstTestRecorder.collect(collectionContext), - secondTestMetricName -> secondTestRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms") + data.utf8String should be(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms") } } trait UdpListenerFixture { val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size") - val testGroupIdentity = new MetricGroupIdentity { - val name: String = "/user/kamon" - val category: MetricGroupCategory = new MetricGroupCategory { - val name: String = "actor" - } + val testEntity = Entity("user/kamon", "test") + + def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { + val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown, Map.empty) + metricKeyGenerator.generateKey(entity, metricKey) } - def buildMetricKey(categoryName: String, entityName: String, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { - val metricIdentity = new MetricIdentity { val name: String = metricName } - val groupIdentity = new MetricGroupIdentity { - val name: String = entityName - val category: MetricGroupCategory = new MetricGroupCategory { - val name: String = categoryName - } - } - metricKeyGenerator.generateKey(groupIdentity, metricIdentity) + def buildRecorder(name: String): TestEntityRecorder = { + Kamon(Metrics).register(TestEntityRecorder, name).get.recorder } - def setup(metrics: Map[String, MetricSnapshot]): TestProbe = { + def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = { val udp = TestProbe() val metricsSender = system.actorOf(Props(new StatsDMetricsSender(new InetSocketAddress("127.0.0.1", 0), testMaxPacketSize, metricKeyGenerator) { override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref @@ -169,22 +147,19 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers udp.expectMsgType[Udp.SimpleSender] udp.reply(Udp.SimpleSenderReady) - val testMetrics = for ((metricName, snapshot) ← metrics) yield { - val testMetricIdentity = new MetricIdentity { - val name: String = metricName - } - - (testMetricIdentity, snapshot) - } - - metricsSender ! TickMetricSnapshot(new MilliTimestamp(0), new MilliTimestamp(0), Map(testGroupIdentity -> new MetricGroupSnapshot { - type GroupSnapshotType = Histogram.Snapshot - def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ??? - - val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap - })) - + val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics) + metricsSender ! fakeSnapshot udp } } } + +class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val metricOne = histogram("metric-one") + val metricTwo = histogram("metric-two") +} + +object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { + def category: String = "test" + def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) +} |