From 01a34f67ff75419c440f2e69c0a0db888a670a34 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 12 Jan 2015 01:45:27 +0100 Subject: ! all: improve the metric recorders infrastructure --- .../src/main/resources/reference.conf | 3 +- .../main/scala/kamon/logreporter/LogReporter.scala | 237 +++++++++++---------- 2 files changed, 125 insertions(+), 115 deletions(-) (limited to 'kamon-log-reporter') diff --git a/kamon-log-reporter/src/main/resources/reference.conf b/kamon-log-reporter/src/main/resources/reference.conf index dea218eb..0f7e4e9a 100644 --- a/kamon-log-reporter/src/main/resources/reference.conf +++ b/kamon-log-reporter/src/main/resources/reference.conf @@ -5,8 +5,7 @@ kamon { log-reporter { - # Enable system metrics - # In order to not get a ClassNotFoundException, we must register the kamon-sytem-metrics module + # Decide whether the log reporter should log system metrics, if available. report-system-metrics = false } } diff --git a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala index 61d87793..dc977a52 100644 --- a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala +++ b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala @@ -19,28 +19,13 @@ package kamon.logreporter import akka.actor._ import akka.event.Logging import kamon.Kamon -import kamon.akka.ActorMetrics -import ActorMetrics.ActorMetricSnapshot -import kamon.metric.Subscriptions.TickMetricSnapshot -import kamon.metric.TraceMetrics.TraceMetricsSnapshot -import kamon.metric.UserMetrics._ +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot import kamon.metric._ import kamon.metric.instrument.{ Counter, Histogram } -import kamon.metrics.ContextSwitchesMetrics.ContextSwitchesMetricsSnapshot -import kamon.metrics.NetworkMetrics.NetworkMetricSnapshot -import kamon.metrics.ProcessCPUMetrics.ProcessCPUMetricsSnapshot -import kamon.metrics._ -import kamon.metrics.CPUMetrics.CPUMetricSnapshot object LogReporter extends ExtensionId[LogReporterExtension] with ExtensionIdProvider { override def lookup(): ExtensionId[_ <: Extension] = LogReporter override def createExtension(system: ExtendedActorSystem): LogReporterExtension = new LogReporterExtension(system) - - trait MetricKeyGenerator { - def localhostName: String - def normalizedLocalhostName: String - def generateKey(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String - } } class LogReporterExtension(system: ExtendedActorSystem) extends Kamon.Extension { @@ -48,25 +33,15 @@ class LogReporterExtension(system: ExtendedActorSystem) extends Kamon.Extension log.info("Starting the Kamon(LogReporter) extension") val logReporterConfig = system.settings.config.getConfig("kamon.log-reporter") - val subscriber = system.actorOf(Props[LogReporterSubscriber], "kamon-log-reporter") - Kamon(Metrics)(system).subscribe(TraceMetrics, "*", subscriber, permanently = true) - Kamon(Metrics)(system).subscribe(ActorMetrics, "*", subscriber, permanently = true) - // Subscribe to all user metrics - Kamon(Metrics)(system).subscribe(UserHistograms, "*", subscriber, permanently = true) - Kamon(Metrics)(system).subscribe(UserCounters, "*", subscriber, permanently = true) - Kamon(Metrics)(system).subscribe(UserMinMaxCounters, "*", subscriber, permanently = true) - Kamon(Metrics)(system).subscribe(UserGauges, "*", subscriber, permanently = true) + Kamon(Metrics)(system).subscribe("trace", "**", subscriber, permanently = true) + Kamon(Metrics)(system).subscribe("actor", "**", subscriber, permanently = true) + Kamon(Metrics)(system).subscribe("user-metrics", "**", subscriber, permanently = true) val includeSystemMetrics = logReporterConfig.getBoolean("report-system-metrics") - if (includeSystemMetrics) { - // Subscribe to SystemMetrics - Kamon(Metrics)(system).subscribe(CPUMetrics, "*", subscriber, permanently = true) - Kamon(Metrics)(system).subscribe(ProcessCPUMetrics, "*", subscriber, permanently = true) - Kamon(Metrics)(system).subscribe(NetworkMetrics, "*", subscriber, permanently = true) - Kamon(Metrics)(system).subscribe(ContextSwitchesMetrics, "*", subscriber, permanently = true) + Kamon(Metrics)(system).subscribe("system-metric", "**", subscriber, permanently = true) } } @@ -79,32 +54,25 @@ class LogReporterSubscriber extends Actor with ActorLogging { } def printMetricSnapshot(tick: TickMetricSnapshot): Unit = { - // Group all the user metrics together. - val histograms = Map.newBuilder[MetricGroupIdentity, Histogram.Snapshot] - val counters = Map.newBuilder[MetricGroupIdentity, Counter.Snapshot] - val minMaxCounters = Map.newBuilder[MetricGroupIdentity, Histogram.Snapshot] - val gauges = Map.newBuilder[MetricGroupIdentity, Histogram.Snapshot] - tick.metrics foreach { - case (identity, ams: ActorMetricSnapshot) ⇒ logActorMetrics(identity.name, ams) - case (identity, tms: TraceMetricsSnapshot) ⇒ logTraceMetrics(identity.name, tms) - case (h: UserHistogram, s: UserHistogramSnapshot) ⇒ histograms += (h -> s.histogramSnapshot) - case (c: UserCounter, s: UserCounterSnapshot) ⇒ counters += (c -> s.counterSnapshot) - case (m: UserMinMaxCounter, s: UserMinMaxCounterSnapshot) ⇒ minMaxCounters += (m -> s.minMaxCounterSnapshot) - case (g: UserGauge, s: UserGaugeSnapshot) ⇒ gauges += (g -> s.gaugeSnapshot) - case (_, cms: CPUMetricSnapshot) ⇒ logCpuMetrics(cms) - case (_, pcms: ProcessCPUMetricsSnapshot) ⇒ logProcessCpuMetrics(pcms) - case (_, nms: NetworkMetricSnapshot) ⇒ logNetworkMetrics(nms) - case (_, csms: ContextSwitchesMetricsSnapshot) ⇒ logContextSwitchesMetrics(csms) - case ignoreEverythingElse ⇒ + case (entity, snapshot) if entity.category == "actor" ⇒ logActorMetrics(entity.name, snapshot) + case (entity, snapshot) if entity.category == "trace" ⇒ logTraceMetrics(entity.name, snapshot) + case (entity, snapshot) if entity.category == "user-metric" ⇒ logUserMetrics(snapshot) + case (entity, snapshot) if entity.category == "system-metric" ⇒ logSystemMetrics(entity.name, snapshot) + case ignoreEverythingElse ⇒ } - - logUserMetrics(histograms.result(), counters.result(), minMaxCounters.result(), gauges.result()) } - def logActorMetrics(name: String, ams: ActorMetricSnapshot): Unit = { - log.info( - """ + def logActorMetrics(name: String, actorSnapshot: EntitySnapshot): Unit = { + for { + processingTime ← actorSnapshot.histogram("processing-time") + timeInMailbox ← actorSnapshot.histogram("time-in-mailbox") + mailboxSize ← actorSnapshot.minMaxCounter("mailbox-size") + errors ← actorSnapshot.counter("errors") + } { + + log.info( + """ |+--------------------------------------------------------------------------------------------------+ || | || Actor: %-83s | @@ -120,46 +88,67 @@ class LogReporterSubscriber extends Actor with ActorLogging { || Max: %-12s Max: %-12s | || | |+--------------------------------------------------------------------------------------------------+""" - .stripMargin.format( - name, - ams.processingTime.numberOfMeasurements, ams.timeInMailbox.numberOfMeasurements, ams.mailboxSize.min, - ams.processingTime.min, ams.timeInMailbox.min, ams.mailboxSize.average, - ams.processingTime.percentile(50.0D), ams.timeInMailbox.percentile(50.0D), ams.mailboxSize.max, - ams.processingTime.percentile(90.0D), ams.timeInMailbox.percentile(90.0D), - ams.processingTime.percentile(95.0D), ams.timeInMailbox.percentile(95.0D), - ams.processingTime.percentile(99.0D), ams.timeInMailbox.percentile(99.0D), ams.errors.count, - ams.processingTime.percentile(99.9D), ams.timeInMailbox.percentile(99.9D), - ams.processingTime.max, ams.timeInMailbox.max)) + .stripMargin.format( + name, + processingTime.numberOfMeasurements, timeInMailbox.numberOfMeasurements, mailboxSize.min, + processingTime.min, timeInMailbox.min, mailboxSize.average, + processingTime.percentile(50.0D), timeInMailbox.percentile(50.0D), mailboxSize.max, + processingTime.percentile(90.0D), timeInMailbox.percentile(90.0D), + processingTime.percentile(95.0D), timeInMailbox.percentile(95.0D), + processingTime.percentile(99.0D), timeInMailbox.percentile(99.0D), errors.count, + processingTime.percentile(99.9D), timeInMailbox.percentile(99.9D), + processingTime.max, timeInMailbox.max)) + } + } - def logCpuMetrics(cms: CPUMetricSnapshot): Unit = { - import cms._ + def logSystemMetrics(metric: String, snapshot: EntitySnapshot): Unit = metric match { + case "cpu" ⇒ logCpuMetrics(snapshot) + case "network" ⇒ logNetworkMetrics(snapshot) + case "process-cpu" ⇒ logProcessCpuMetrics(snapshot) + case "context-switches" ⇒ logContextSwitchesMetrics(snapshot) + case ignoreOthers ⇒ + } - log.info( - """ + def logCpuMetrics(cpuMetrics: EntitySnapshot): Unit = { + for { + user ← cpuMetrics.histogram("cpu-user") + system ← cpuMetrics.histogram("cpu-system") + cpuWait ← cpuMetrics.histogram("cpu-wait") + idle ← cpuMetrics.histogram("cpu-idle") + } { + + log.info( + """ |+--------------------------------------------------------------------------------------------------+ || | || CPU (ALL) | || | || User (percentage) System (percentage) Wait (percentage) Idle (percentage) | || Min: %-3s Min: %-3s Min: %-3s Min: %-3s | - || Avg: %-3s Avg: %-3s Avg: %-3s Avg: %-3s | + || Avg: %-3s Avg: %-3s Avg: %-3s Avg: %-3s | || Max: %-3s Max: %-3s Max: %-3s Max: %-3s | || | || | |+--------------------------------------------------------------------------------------------------+""" - .stripMargin.format( - user.min, system.min, cpuWait.min, idle.min, - user.average, system.average, cpuWait.average, idle.average, - user.max, system.max, cpuWait.max, idle.max)) + .stripMargin.format( + user.min, system.min, cpuWait.min, idle.min, + user.average, system.average, cpuWait.average, idle.average, + user.max, system.max, cpuWait.max, idle.max)) + } } - def logNetworkMetrics(nms: NetworkMetricSnapshot): Unit = { - import nms._ + def logNetworkMetrics(networkMetrics: EntitySnapshot): Unit = { + for { + rxBytes ← networkMetrics.histogram("rx-bytes") + txBytes ← networkMetrics.histogram("tx-bytes") + rxErrors ← networkMetrics.histogram("rx-errors") + txErrors ← networkMetrics.histogram("tx-errors") + } { - log.info( - """ + log.info( + """ |+--------------------------------------------------------------------------------------------------+ || | || Network (ALL) | @@ -170,38 +159,50 @@ class LogReporterSubscriber extends Actor with ActorLogging { || Max: %-4s Max: %-4s | || | |+--------------------------------------------------------------------------------------------------+""" - .stripMargin.format( - rxBytes.min, txBytes.min, rxErrors.sum, txErrors.sum, - rxBytes.average, txBytes.average, - rxBytes.max, txBytes.max)) + .stripMargin. + format( + rxBytes.min, txBytes.min, rxErrors.sum, txErrors.sum, + rxBytes.average, txBytes.average, + rxBytes.max, txBytes.max)) + } } - def logProcessCpuMetrics(pcms: ProcessCPUMetricsSnapshot): Unit = { - import pcms._ + def logProcessCpuMetrics(processCpuMetrics: EntitySnapshot): Unit = { + for { + user ← processCpuMetrics.histogram("process-user-cpu") + total ← processCpuMetrics.histogram("process-cpu") + } { - log.info( - """ + log.info( + """ |+--------------------------------------------------------------------------------------------------+ || | || Process-CPU | || | - || Cpu-Percentage Total-Process-Time | + || User-Percentage Total-Percentage | || Min: %-12s Min: %-12s | || Avg: %-12s Avg: %-12s | || Max: %-12s Max: %-12s | || | |+--------------------------------------------------------------------------------------------------+""" - .stripMargin.format( - (cpuPercent.min / 100), totalProcessTime.min, - (cpuPercent.average / 100), totalProcessTime.average, - (cpuPercent.max / 100), totalProcessTime.max)) + .stripMargin. + format( + (user.min, total.min, + user.average, total.average, + user.max, total.max))) + } + } - def logContextSwitchesMetrics(csms: ContextSwitchesMetricsSnapshot): Unit = { - import csms._ + def logContextSwitchesMetrics(contextSwitchMetrics: EntitySnapshot): Unit = { + for { + perProcessVoluntary ← contextSwitchMetrics.histogram("context-switches-process-voluntary") + perProcessNonVoluntary ← contextSwitchMetrics.histogram("context-switches-process-non-voluntary") + global ← contextSwitchMetrics.histogram("context-switches-global") + } { - log.info( - """ + log.info( + """ |+--------------------------------------------------------------------------------------------------+ || | || Context-Switches | @@ -212,18 +213,24 @@ class LogReporterSubscriber extends Actor with ActorLogging { || Max: %-12s Max: %-12s Max: %-12s | || | |+--------------------------------------------------------------------------------------------------+""" - .stripMargin.format( - global.min, perProcessNonVoluntary.min, perProcessVoluntary.min, - global.average, perProcessNonVoluntary.average, perProcessVoluntary.average, - global.max, perProcessNonVoluntary.max, perProcessVoluntary.max)) + .stripMargin. + format( + global.min, perProcessNonVoluntary.min, perProcessVoluntary.min, + global.average, perProcessNonVoluntary.average, perProcessVoluntary.average, + global.max, perProcessNonVoluntary.max, perProcessVoluntary.max)) + } } - def logTraceMetrics(name: String, tms: TraceMetricsSnapshot): Unit = { + def logTraceMetrics(name: String, traceSnapshot: EntitySnapshot): Unit = { val traceMetricsData = StringBuilder.newBuilder - traceMetricsData.append( - """ + for { + elapsedTime ← traceSnapshot.histogram("elapsed-time") + } { + + traceMetricsData.append( + """ |+--------------------------------------------------------------------------------------------------+ || | || Trace: %-83s | @@ -231,22 +238,26 @@ class LogReporterSubscriber extends Actor with ActorLogging { || | || Elapsed Time (nanoseconds): | |""" - .stripMargin.format( - name, tms.elapsedTime.numberOfMeasurements)) - - traceMetricsData.append(compactHistogramView(tms.elapsedTime)) - traceMetricsData.append( - """ - || | - |+--------------------------------------------------------------------------------------------------+""" - .stripMargin) - - log.info(traceMetricsData.toString()) + .stripMargin.format( + name, elapsedTime.numberOfMeasurements)) + + traceMetricsData.append(compactHistogramView(elapsedTime)) + traceMetricsData.append( + """ + || | + |+--------------------------------------------------------------------------------------------------+""" + . + stripMargin) + + log.info(traceMetricsData.toString()) + } } - def logUserMetrics(histograms: Map[MetricGroupIdentity, Histogram.Snapshot], - counters: Map[MetricGroupIdentity, Counter.Snapshot], minMaxCounters: Map[MetricGroupIdentity, Histogram.Snapshot], - gauges: Map[MetricGroupIdentity, Histogram.Snapshot]): Unit = { + def logUserMetrics(userMetrics: EntitySnapshot): Unit = { + val histograms = userMetrics.histograms + val minMaxCounters = userMetrics.minMaxCounters + val gauges = userMetrics.gauges + val counters = userMetrics.counters if (histograms.isEmpty && counters.isEmpty && minMaxCounters.isEmpty && gauges.isEmpty) { log.info("No user metrics reported") -- cgit v1.2.3