diff options
Diffstat (limited to 'kamon-statsd/src')
6 files changed, 110 insertions, 167 deletions
diff --git a/kamon-statsd/src/main/resources/reference.conf b/kamon-statsd/src/main/resources/reference.conf index a10ac735..f26ce98b 100644 --- a/kamon-statsd/src/main/resources/reference.conf +++ b/kamon-statsd/src/main/resources/reference.conf @@ -12,18 +12,19 @@ kamon { # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the # kamon.metrics.tick-interval setting. - flush-interval = 1 second + flush-interval = 10 seconds # Max packet size for UDP metrics data sent to StatsD. max-packet-size = 1024 bytes # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics # collection for your desired entities must be activated under the kamon.metrics.filters settings. - includes { - actor = [ "*" ] - trace = [ "*" ] - dispatcher = [ "*" ] - router = [ "*" ] + subscriptions { + trace = [ "**" ] + actor = [ "**" ] + dispatcher = [ "**" ] + user-metric = [ "**" ] + system-metric = [ "**" ] } # Enable system metrics @@ -61,4 +62,12 @@ kamon { metric-name-normalization-strategy = normalize } } + + modules { + kamon-statsd { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.statsd.StatsD" + } + } }
\ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala index 28354423..0fce855c 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala @@ -3,11 +3,10 @@ package kamon.statsd import java.lang.management.ManagementFactory import com.typesafe.config.Config -import kamon.metric.UserMetrics.UserMetricGroup -import kamon.metric.{ MetricIdentity, MetricGroupIdentity } +import kamon.metric.{ MetricKey, Entity } trait MetricKeyGenerator { - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String + def generateKey(entity: Entity, metricKey: MetricKey): String } class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator { @@ -27,16 +26,11 @@ class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator { if (includeHostname) s"$application.$normalizedHostname" else application - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = { - val normalizedGroupName = normalizer(groupIdentity.name) - val key = s"${baseName}.${groupIdentity.category.name}.${normalizedGroupName}" - - if (isUserMetric(groupIdentity)) key - else s"${key}.${metricIdentity.name}" + def generateKey(entity: Entity, metricKey: MetricKey): String = { + val normalizedGroupName = normalizer(entity.name) + s"${baseName}.${entity.category}.${normalizedGroupName}.${metricKey.name}" } - def isUserMetric(groupIdentity: MetricGroupIdentity): Boolean = groupIdentity.isInstanceOf[UserMetricGroup] - def hostName: String = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) def createNormalizer(strategy: String): Normalizer = strategy match { diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 58fb3658..d406faf6 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -18,16 +18,14 @@ package kamon.statsd import akka.actor._ import kamon.Kamon -import kamon.metric.UserMetrics._ import kamon.metric._ -import kamon.metrics._ +import kamon.util.ConfigTools.Syntax import scala.concurrent.duration._ -import scala.collection.JavaConverters._ import com.typesafe.config.Config -import java.lang.management.ManagementFactory import akka.event.Logging import java.net.InetSocketAddress import java.util.concurrent.TimeUnit.MILLISECONDS +import scala.collection.JavaConverters._ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = StatsD @@ -35,59 +33,31 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { } class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { + implicit val as = system + val log = Logging(system, classOf[StatsDExtension]) log.info("Starting the Kamon(StatsD) extension") private val config = system.settings.config private val statsDConfig = config.getConfig("kamon.statsd") + val metricsExtension = Kamon.metrics - val tickInterval = config.getMilliseconds("kamon.metrics.tick-interval") + val tickInterval = metricsExtension.settings.tickInterval val statsDHost = new InetSocketAddress(statsDConfig.getString("hostname"), statsDConfig.getInt("port")) - val flushInterval = statsDConfig.getMilliseconds("flush-interval") + val flushInterval = statsDConfig.getFiniteDuration("flush-interval") val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size") val keyGeneratorFQCN = statsDConfig.getString("metric-key-generator") val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config) - // Subscribe to all user metrics - Kamon(Metrics)(system).subscribe(UserHistograms, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserCounters, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserGauges, "*", statsDMetricsListener, permanently = true) - - // Subscribe to Actors - val includedActors = statsDConfig.getStringList("includes.actor").asScala - for (actorPathPattern ← includedActors) { - Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Routers - val includedRouters = statsDConfig.getStringList("includes.router").asScala - for (routerPathPattern ← includedRouters) { - Kamon(Metrics)(system).subscribe(RouterMetrics, routerPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Traces - val includedTraces = statsDConfig.getStringList("includes.trace").asScala - for (tracePathPattern ← includedTraces) { - Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Dispatchers - val includedDispatchers = statsDConfig.getStringList("includes.dispatcher").asScala - for (dispatcherPathPattern ← includedDispatchers) { - Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to SystemMetrics - val includeSystemMetrics = statsDConfig.getBoolean("report-system-metrics") - if (includeSystemMetrics) { - Seq(CPUMetrics, ProcessCPUMetrics, MemoryMetrics, NetworkMetrics, GCMetrics, HeapMetrics, ContextSwitchesMetrics) foreach { metric ⇒ - Kamon(Metrics)(system).subscribe(metric, "*", statsDMetricsListener, permanently = true) + val subscriptions = statsDConfig.getConfig("subscriptions") + subscriptions.firstLevelKeys.map { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + metricsExtension.subscribe(subscriptionCategory, pattern, statsDMetricsListener, permanently = true) } } - def buildMetricsListener(tickInterval: Long, flushInterval: Long, keyGeneratorFQCN: String, config: Config): ActorRef = { + def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, keyGeneratorFQCN: String, config: Config): ActorRef = { assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval") val keyGenerator = system.dynamicAccess.createInstanceFor[MetricKeyGenerator](keyGeneratorFQCN, (classOf[Config], config) :: Nil).get @@ -100,7 +70,7 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { // No need to buffer the metrics, let's go straight to the metrics sender. metricsSender } else { - system.actorOf(TickMetricSnapshotBuffer.props(flushInterval.toInt.millis, metricsSender), "statsd-metrics-buffer") + system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, metricsSender), "statsd-metrics-buffer") } } }
\ No newline at end of file diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 2aac3a52..3241e1f3 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,7 +20,7 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } import java.util.Locale @@ -51,11 +51,11 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long, val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( - (groupIdentity, groupSnapshot) ← tick.metrics; - (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + (entity, snapshot) ← tick.metrics; + (metricKey, metricSnapshot) ← snapshot.metrics ) { - val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) + val key = metricKeyGenerator.generateKey(entity, metricKey) metricSnapshot match { case hs: Histogram.Snapshot ⇒ diff --git a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala index ed3fae5b..0edeb3df 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/SimpleMetricKeyGeneratorSpec.scala @@ -1,7 +1,8 @@ package kamon.statsd import com.typesafe.config.ConfigFactory -import kamon.metric.{ MetricGroupCategory, MetricGroupIdentity, MetricIdentity } +import kamon.metric.instrument.UnitOfMeasurement +import kamon.metric._ import org.scalatest.{ Matchers, WordSpec } class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers { @@ -68,13 +69,8 @@ class SimpleMetricKeyGeneratorSpec extends WordSpec with Matchers { } def buildMetricKey(categoryName: String, entityName: String, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { - val metricIdentity = new MetricIdentity { val name: String = metricName } - val groupIdentity = new MetricGroupIdentity { - val name: String = entityName - val category: MetricGroupCategory = new MetricGroupCategory { - val name: String = categoryName - } - } - metricKeyGenerator.generateKey(groupIdentity, metricIdentity) + val metric = HistogramKey(metricName, UnitOfMeasurement.Unknown, Map.empty) + val entity = Entity(entityName, categoryName) + metricKeyGenerator.generateKey(entity, metric) } } diff --git a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala index 5d37bb75..0211ac0f 100644 --- a/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala +++ b/kamon-statsd/src/test/scala/kamon/statsd/StatsDMetricSenderSpec.scala @@ -19,95 +19,85 @@ package kamon.statsd import akka.testkit.{ TestKitBase, TestProbe } import akka.actor.{ ActorRef, Props, ActorSystem } import kamon.Kamon -import kamon.metric.instrument.Histogram.Precision -import kamon.metric.instrument.Histogram +import kamon.metric.instrument.{ InstrumentFactory, UnitOfMeasurement } +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp import org.scalatest.{ Matchers, WordSpecLike } import kamon.metric._ import akka.io.Udp -import kamon.metric.Subscriptions.TickMetricSnapshot -import java.lang.management.ManagementFactory +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import java.net.InetSocketAddress import com.typesafe.config.ConfigFactory -class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers { - implicit lazy val system: ActorSystem = ActorSystem("statsd-metric-sender-spec", ConfigFactory.parseString( - """ - |kamon { - | metrics { - | disable-aspectj-weaver-missing-error = true - | } - | - | statsd.simple-metric-key-generator { - | application = kamon - | hostname-override = kamon-host - | include-hostname = true - | metric-name-normalization-strategy = normalize - | } - |} - | - """.stripMargin)) +class StatsDMetricSenderSpec extends BaseKamonSpec("statsd-metric-sender-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | statsd.simple-metric-key-generator { + | application = kamon + | hostname-override = kamon-host + | include-hostname = true + | metric-name-normalization-strategy = normalize + | } + |} + | + """.stripMargin) implicit val metricKeyGenerator = new SimpleMetricKeyGenerator(system.settings.config) { override def hostName: String = "localhost_local" } - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext - "the StatsDMetricSender" should { - "flush the metrics data after processing the tick, even if the max-packet-size is not reached" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms") } "render several measurements of the same key under a single (key + multiple measurements) packet" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) - testRecorder.record(11L) - testRecorder.record(12L) - - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(11L) + testRecorder.metricOne.record(12L) + + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms:11|ms:12|ms") } "include the correspondent sampling rate when rendering multiple occurrences of the same value" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - testRecorder.record(10L) - testRecorder.record(10L) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] data.utf8String should be(s"$testMetricKey:10|ms|@0.5") } "flush the packet when the max-packet-size is reached" in new UdpListenerFixture { - val testMetricName = "processing-time" - val testMetricKey = buildMetricKey("actor", "/user/kamon", testMetricName) - val testRecorder = Histogram(10000L, Precision.Normal, Scale.Unit) + val testMetricKey = buildMetricKey(testEntity, "metric-one") + val testRecorder = buildRecorder("user/kamon") var bytes = testMetricKey.length var level = 0 while (bytes <= testMaxPacketSize) { level += 1 - testRecorder.record(level) + testRecorder.metricOne.record(level) bytes += s":$level|ms".length } - val udp = setup(Map(testMetricName -> testRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) udp.expectMsgType[Udp.Send] // let the first flush pass val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] @@ -115,51 +105,38 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers } "render multiple keys in the same packet using newline as separator" in new UdpListenerFixture { - val firstTestMetricName = "first-test-metric" - val firstTestMetricKey = buildMetricKey("actor", "/user/kamon", firstTestMetricName) - val secondTestMetricName = "second-test-metric" - val secondTestMetricKey = buildMetricKey("actor", "/user/kamon", secondTestMetricName) + val testMetricKey1 = buildMetricKey(testEntity, "metric-one") + val testMetricKey2 = buildMetricKey(testEntity, "metric-two") + val testRecorder = buildRecorder("user/kamon") - val firstTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) - val secondTestRecorder = Histogram(1000L, Precision.Normal, Scale.Unit) + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(10L) + testRecorder.metricOne.record(11L) - firstTestRecorder.record(10L) - firstTestRecorder.record(10L) - firstTestRecorder.record(11L) + testRecorder.metricTwo.record(20L) + testRecorder.metricTwo.record(21L) - secondTestRecorder.record(20L) - secondTestRecorder.record(21L) - - val udp = setup(Map( - firstTestMetricName -> firstTestRecorder.collect(collectionContext), - secondTestMetricName -> secondTestRecorder.collect(collectionContext))) + val udp = setup(Map(testEntity -> testRecorder.collect(collectionContext))) val Udp.Send(data, _, _) = udp.expectMsgType[Udp.Send] - data.utf8String should be(s"$firstTestMetricKey:10|ms|@0.5:11|ms\n$secondTestMetricKey:20|ms:21|ms") + data.utf8String should be(s"$testMetricKey1:10|ms|@0.5:11|ms\n$testMetricKey2:20|ms:21|ms") } } trait UdpListenerFixture { val testMaxPacketSize = system.settings.config.getBytes("kamon.statsd.max-packet-size") - val testGroupIdentity = new MetricGroupIdentity { - val name: String = "/user/kamon" - val category: MetricGroupCategory = new MetricGroupCategory { - val name: String = "actor" - } + val testEntity = Entity("user/kamon", "test") + + def buildMetricKey(entity: Entity, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { + val metricKey = HistogramKey(metricName, UnitOfMeasurement.Unknown, Map.empty) + metricKeyGenerator.generateKey(entity, metricKey) } - def buildMetricKey(categoryName: String, entityName: String, metricName: String)(implicit metricKeyGenerator: SimpleMetricKeyGenerator): String = { - val metricIdentity = new MetricIdentity { val name: String = metricName } - val groupIdentity = new MetricGroupIdentity { - val name: String = entityName - val category: MetricGroupCategory = new MetricGroupCategory { - val name: String = categoryName - } - } - metricKeyGenerator.generateKey(groupIdentity, metricIdentity) + def buildRecorder(name: String): TestEntityRecorder = { + Kamon.metrics.register(TestEntityRecorder, name).get.recorder } - def setup(metrics: Map[String, MetricSnapshot]): TestProbe = { + def setup(metrics: Map[Entity, EntitySnapshot]): TestProbe = { val udp = TestProbe() val metricsSender = system.actorOf(Props(new StatsDMetricsSender(new InetSocketAddress("127.0.0.1", 0), testMaxPacketSize, metricKeyGenerator) { override def udpExtension(implicit system: ActorSystem): ActorRef = udp.ref @@ -169,22 +146,19 @@ class StatsDMetricSenderSpec extends TestKitBase with WordSpecLike with Matchers udp.expectMsgType[Udp.SimpleSender] udp.reply(Udp.SimpleSenderReady) - val testMetrics = for ((metricName, snapshot) ← metrics) yield { - val testMetricIdentity = new MetricIdentity { - val name: String = metricName - } - - (testMetricIdentity, snapshot) - } - - metricsSender ! TickMetricSnapshot(0, 0, Map(testGroupIdentity -> new MetricGroupSnapshot { - type GroupSnapshotType = Histogram.Snapshot - def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType = ??? - - val metrics: Map[MetricIdentity, MetricSnapshot] = testMetrics.toMap - })) - + val fakeSnapshot = TickMetricSnapshot(MilliTimestamp.now, MilliTimestamp.now, metrics) + metricsSender ! fakeSnapshot udp } } } + +class TestEntityRecorder(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val metricOne = histogram("metric-one") + val metricTwo = histogram("metric-two") +} + +object TestEntityRecorder extends EntityRecorderFactory[TestEntityRecorder] { + def category: String = "test" + def createRecorder(instrumentFactory: InstrumentFactory): TestEntityRecorder = new TestEntityRecorder(instrumentFactory) +} |