From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- .../kamon/statsd/SimpleMetricKeyGenerator.scala | 16 ++---- .../src/main/scala/kamon/statsd/StatsD.scala | 65 +++------------------- .../scala/kamon/statsd/StatsDMetricsSender.scala | 8 +-- 3 files changed, 18 insertions(+), 71 deletions(-) (limited to 'kamon-statsd/src/main/scala') diff --git a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala index 28354423..0fce855c 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/SimpleMetricKeyGenerator.scala @@ -3,11 +3,10 @@ package kamon.statsd import java.lang.management.ManagementFactory import com.typesafe.config.Config -import kamon.metric.UserMetrics.UserMetricGroup -import kamon.metric.{ MetricIdentity, MetricGroupIdentity } +import kamon.metric.{ MetricKey, Entity } trait MetricKeyGenerator { - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String + def generateKey(entity: Entity, metricKey: MetricKey): String } class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator { @@ -27,16 +26,11 @@ class SimpleMetricKeyGenerator(config: Config) extends MetricKeyGenerator { if (includeHostname) s"$application.$normalizedHostname" else application - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = { - val normalizedGroupName = normalizer(groupIdentity.name) - val key = s"${baseName}.${groupIdentity.category.name}.${normalizedGroupName}" - - if (isUserMetric(groupIdentity)) key - else s"${key}.${metricIdentity.name}" + def generateKey(entity: Entity, metricKey: MetricKey): String = { + val normalizedGroupName = normalizer(entity.name) + s"${baseName}.${entity.category}.${normalizedGroupName}.${metricKey.name}" } - def isUserMetric(groupIdentity: MetricGroupIdentity): Boolean = groupIdentity.isInstanceOf[UserMetricGroup] - def hostName: String = ManagementFactory.getRuntimeMXBean.getName.split('@')(1) def createNormalizer(strategy: String): Normalizer = strategy match { diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala index 2505f06a..e5a15a9d 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala @@ -18,17 +18,14 @@ package kamon.statsd import akka.actor._ import kamon.Kamon -import kamon.akka.{RouterMetrics, DispatcherMetrics, ActorMetrics} -import kamon.http.HttpServerMetrics -import kamon.metric.UserMetrics._ import kamon.metric._ -import kamon.metrics._ +import kamon.util.ConfigTools.Syntax import scala.concurrent.duration._ -import scala.collection.JavaConverters._ import com.typesafe.config.Config import akka.event.Logging import java.net.InetSocketAddress import java.util.concurrent.TimeUnit.MILLISECONDS +import scala.collection.JavaConverters._ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = StatsD @@ -36,6 +33,8 @@ object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider { } class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { + implicit val as = system + val log = Logging(system, classOf[StatsDExtension]) log.info("Starting the Kamon(StatsD) extension") @@ -50,57 +49,11 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension { val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config) - // Subscribe to all user metrics - Kamon(Metrics)(system).subscribe(UserHistograms, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserCounters, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(UserGauges, "*", statsDMetricsListener, permanently = true) - - // Subscribe to server metrics - Kamon(Metrics)(system).subscribe(HttpServerMetrics.category, "*", statsDMetricsListener, permanently = true) - - // Subscribe to Actors - val includedActors = statsDConfig.getStringList("includes.actor").asScala - for (actorPathPattern ← includedActors) { - Kamon(Metrics)(system).subscribe(ActorMetrics, actorPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Routers - val includedRouters = statsDConfig.getStringList("includes.router").asScala - for (routerPathPattern ← includedRouters) { - Kamon(Metrics)(system).subscribe(RouterMetrics, routerPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Traces - val includedTraces = statsDConfig.getStringList("includes.trace").asScala - for (tracePathPattern ← includedTraces) { - Kamon(Metrics)(system).subscribe(TraceMetrics, tracePathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to Dispatchers - val includedDispatchers = statsDConfig.getStringList("includes.dispatcher").asScala - for (dispatcherPathPattern ← includedDispatchers) { - Kamon(Metrics)(system).subscribe(DispatcherMetrics, dispatcherPathPattern, statsDMetricsListener, permanently = true) - } - - // Subscribe to SystemMetrics - val includeSystemMetrics = statsDConfig.getBoolean("report-system-metrics") - if (includeSystemMetrics) { - //OS - Kamon(Metrics)(system).subscribe(CPUMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ProcessCPUMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(MemoryMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(NetworkMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(DiskMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ContextSwitchesMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(LoadAverageMetrics, "*", statsDMetricsListener, permanently = true) - - //JVM - Kamon(Metrics)(system).subscribe(HeapMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(NonHeapMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ThreadMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(ClassLoadingMetrics, "*", statsDMetricsListener, permanently = true) - Kamon(Metrics)(system).subscribe(GCMetrics, "*", statsDMetricsListener, permanently = true) + val subscriptions = statsDConfig.getConfig("subscriptions") + subscriptions.firstLevelKeys.map { subscriptionCategory ⇒ + subscriptions.getStringList(subscriptionCategory).asScala.foreach { pattern ⇒ + Kamon(Metrics).subscribe(subscriptionCategory, pattern, statsDMetricsListener, permanently = true) + } } def buildMetricsListener(tickInterval: Long, flushInterval: Long, keyGeneratorFQCN: String, config: Config): ActorRef = { diff --git a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala index 2aac3a52..3241e1f3 100644 --- a/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala +++ b/kamon-statsd/src/main/scala/kamon/statsd/StatsDMetricsSender.scala @@ -20,7 +20,7 @@ import akka.actor.{ ActorSystem, Props, ActorRef, Actor } import akka.io.{ Udp, IO } import java.net.InetSocketAddress import akka.util.ByteString -import kamon.metric.Subscriptions.TickMetricSnapshot +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import java.text.{ DecimalFormatSymbols, DecimalFormat } import java.util.Locale @@ -51,11 +51,11 @@ class StatsDMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long, val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote) for ( - (groupIdentity, groupSnapshot) ← tick.metrics; - (metricIdentity, metricSnapshot) ← groupSnapshot.metrics + (entity, snapshot) ← tick.metrics; + (metricKey, metricSnapshot) ← snapshot.metrics ) { - val key = metricKeyGenerator.generateKey(groupIdentity, metricIdentity) + val key = metricKeyGenerator.generateKey(entity, metricKey) metricSnapshot match { case hs: Histogram.Snapshot ⇒ -- cgit v1.2.3