diff options
5 files changed, 86 insertions, 21 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala index eb6b5293..4567d442 100644 --- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala @@ -62,13 +62,13 @@ class DispatcherInstrumentation { val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category) if (Kamon.metrics.shouldTrack(dispatcherEntity)) - Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName) + Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName, Map("dispatcher-type" -> "fork-join-pool")) case tpe: ThreadPoolExecutor ⇒ val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category) if (Kamon.metrics.shouldTrack(dispatcherEntity)) - Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName) + Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName, Map("dispatcher-type" -> "thread-pool-executor")) case others ⇒ // Currently not interested in other kinds of dispatchers. } diff --git a/kamon-core/src/main/scala/kamon/ModuleLoader.scala b/kamon-core/src/main/scala/kamon/ModuleLoader.scala index 0cde49c3..d1b7466e 100644 --- a/kamon-core/src/main/scala/kamon/ModuleLoader.scala +++ b/kamon-core/src/main/scala/kamon/ModuleLoader.scala @@ -20,8 +20,7 @@ import _root_.akka.actor import _root_.akka.actor._ import _root_.akka.event.Logging import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation.{Around, Pointcut, Aspect} - +import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect } private[kamon] object ModuleLoader extends ExtensionId[ModuleLoaderExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: actor.Extension] = ModuleLoader @@ -32,7 +31,6 @@ private[kamon] class ModuleLoaderExtension(system: ExtendedActorSystem) extends val log = Logging(system, "ModuleLoader") val settings = ModuleLoaderSettings(system) - if (settings.modulesRequiringAspectJ.nonEmpty && !isAspectJPresent && settings.showAspectJMissingWarning) logAspectJWeaverMissing(settings.modulesRequiringAspectJ) diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala index 6a976a1c..58e16bbd 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -16,16 +16,15 @@ package kamon.metric +import akka.actor._ import com.typesafe.config.Config -import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } +import kamon.metric.SubscriptionsDispatcher.{ Subscribe, Unsubscribe } import kamon.metric.instrument.Gauge.CurrentValueCollector import kamon.metric.instrument.Histogram.DynamicRange import kamon.metric.instrument._ +import kamon.util.LazyActorRef import scala.collection.concurrent.TrieMap -import akka.actor._ -import kamon.util.{ Supplier, LazyActorRef, TriemapAtomicGetOrElseUpdate } - import scala.concurrent.duration.FiniteDuration case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) @@ -227,7 +226,7 @@ trait Metrics { } private[kamon] class MetricsImpl(config: Config) extends Metrics { - import TriemapAtomicGetOrElseUpdate.Syntax + import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] private val _subscriptions = new LazyActorRef diff --git a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala index fc1f0faf..ad3e77f6 100644 --- a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala +++ b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * Copyright © 2013-2015 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -36,8 +36,9 @@ class LogReporterExtension(system: ExtendedActorSystem) extends Kamon.Extension val subscriber = system.actorOf(Props[LogReporterSubscriber], "kamon-log-reporter") Kamon.metrics.subscribe("trace", "**", subscriber, permanently = true) - Kamon.metrics.subscribe("actor", "**", subscriber, permanently = true) - Kamon.metrics.subscribe("user-metrics", "**", subscriber, permanently = true) + Kamon.metrics.subscribe("akka-actor", "**", subscriber, permanently = true) + Kamon.metrics.subscribe("akka-dispatcher", "**", subscriber, permanently = true) + Kamon.metrics.subscribe("metrics", "**", subscriber, permanently = true) val includeSystemMetrics = logReporterConfig.getBoolean("report-system-metrics") if (includeSystemMetrics) { @@ -47,6 +48,7 @@ class LogReporterExtension(system: ExtendedActorSystem) extends Kamon.Extension } class LogReporterSubscriber extends Actor with ActorLogging { + import kamon.logreporter.LogReporterSubscriber.RichHistogramSnapshot def receive = { @@ -55,11 +57,12 @@ class LogReporterSubscriber extends Actor with ActorLogging { def printMetricSnapshot(tick: TickMetricSnapshot): Unit = { tick.metrics foreach { - case (entity, snapshot) if entity.category == "actor" ⇒ logActorMetrics(entity.name, snapshot) - case (entity, snapshot) if entity.category == "trace" ⇒ logTraceMetrics(entity.name, snapshot) - case (entity, snapshot) if entity.category == "simple-metric" ⇒ logSimpleMetrics(snapshot) - case (entity, snapshot) if entity.category == "system-metric" ⇒ logSystemMetrics(entity.name, snapshot) - case ignoreEverythingElse ⇒ + case (entity, snapshot) if entity.category == "akka-actor" ⇒ logActorMetrics(entity.name, snapshot) + case (entity, snapshot) if entity.category == "akka-dispatcher" ⇒ logDispatcherMetrics(entity, snapshot) + case (entity, snapshot) if entity.category == "trace" ⇒ logTraceMetrics(entity.name, snapshot) + case (entity, snapshot) if entity.category == "metrics" ⇒ logMetrics(snapshot) + case (entity, snapshot) if entity.category == "system-metric" ⇒ logSystemMetrics(entity.name, snapshot) + case ignoreEverythingElse ⇒ } } @@ -102,6 +105,71 @@ class LogReporterSubscriber extends Actor with ActorLogging { } + def logDispatcherMetrics(entity: Entity, snapshot: EntitySnapshot): Unit = entity.tags.get("dispatcher-type") match { + case Some("fork-join-pool") ⇒ logForkJoinPool(entity.name, snapshot) + case Some("thread-pool-executor") ⇒ logThreadPoolExecutor(entity.name, snapshot) + case ignoreOthers ⇒ + } + + def logForkJoinPool(name: String, forkJoinMetrics: EntitySnapshot): Unit = { + for { + paralellism ← forkJoinMetrics.minMaxCounter("parallelism") + poolSize ← forkJoinMetrics.gauge("pool-size") + activeThreads ← forkJoinMetrics.gauge("active-threads") + runningThreads ← forkJoinMetrics.gauge("running-threads") + queuedTaskCount ← forkJoinMetrics.gauge("queued-task-count") + + } { + log.info( + """ + |+--------------------------------------------------------------------------------------------------+ + || Fork-Join-Pool | + || | + || Dispatcher: %-83s | + || | + || Paralellism Pool Size Active Threads Running Threads Queue Task Count | + || Min %-4s %-4s %-4s %-4s %-4s | + || Avg %-4s %-4s %-4s %-4s %-4s | + || Max %-4s %-4s %-4s %-4s %-4s | + || | + |+--------------------------------------------------------------------------------------------------+""" + .stripMargin.format(name, + paralellism.min, poolSize.min, activeThreads.min, runningThreads.min, queuedTaskCount.min, + paralellism.average, poolSize.average, activeThreads.average, runningThreads.average, queuedTaskCount.average, + paralellism.max, poolSize.max, activeThreads.max, runningThreads.max, queuedTaskCount.max)) + } + } + + def logThreadPoolExecutor(name: String, threadPoolMetrics: EntitySnapshot): Unit = { + for { + corePoolSize ← threadPoolMetrics.gauge("core-pool-size") + maxPoolSize ← threadPoolMetrics.gauge("max-pool-size") + poolSize ← threadPoolMetrics.gauge("pool-size") + activeThreads ← threadPoolMetrics.gauge("active-threads") + processedTasks ← threadPoolMetrics.gauge("processed-tasks") + } { + + log.info( + """ + |+--------------------------------------------------------------------------------------------------+ + || Thread-Pool-Executor | + || | + || Dispatcher: %-83s | + || | + || Core Pool Size Max Pool Size Pool Size Active Threads Processed Task | + || Min %-4s %-4s %-4s %-4s %-4s | + || Avg %-4s %-4s %-4s %-4s %-4s | + || Max %-4s %-4s %-4s %-4s %-4s | + || | + |+--------------------------------------------------------------------------------------------------+""" + .stripMargin.format(name, + corePoolSize.min, maxPoolSize.min, poolSize.min, activeThreads.min, processedTasks.min, + corePoolSize.average, maxPoolSize.average, poolSize.average, activeThreads.average, processedTasks.average, + corePoolSize.max, maxPoolSize.max, poolSize.max, activeThreads.max, processedTasks.max)) + } + + } + def logSystemMetrics(metric: String, snapshot: EntitySnapshot): Unit = metric match { case "cpu" ⇒ logCpuMetrics(snapshot) case "network" ⇒ logNetworkMetrics(snapshot) @@ -253,7 +321,7 @@ class LogReporterSubscriber extends Actor with ActorLogging { } } - def logSimpleMetrics(simpleMetrics: EntitySnapshot): Unit = { + def logMetrics(simpleMetrics: EntitySnapshot): Unit = { val histograms = simpleMetrics.histograms val minMaxCounters = simpleMetrics.minMaxCounters val gauges = simpleMetrics.gauges diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala index 630cc7a2..94c6fefa 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala @@ -67,7 +67,7 @@ class SystemMetricsSupervisor(sigarRefreshInterval: FiniteDuration, contextSwitc if (isLinux) { val contextSwitchesRecorder = ContextSwitchesMetrics.register(context.system, contextSwitchesRefreshInterval) context.actorOf(ContextSwitchesUpdater.props(contextSwitchesRecorder, sigarRefreshInterval) - .withDispatcher("kamon.system-metrics.context-switches-dispatcher"), "context-switches-metrics-recorder") + .withDispatcher("kamon.system-metrics.context-switches-dispatcher"), "context-switches-metrics-recorder") } def isLinux: Boolean = @@ -76,7 +76,7 @@ class SystemMetricsSupervisor(sigarRefreshInterval: FiniteDuration, contextSwitc def receive = Actor.emptyBehavior override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { - case anyException => Restart + case anyException ⇒ Restart } } |