aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego <diegolparra@gmail.com>2015-03-13 00:22:31 -0300
committerDiego <diegolparra@gmail.com>2015-03-13 00:22:31 -0300
commit37bf47d4ede655df5bda73882a2c15f98b39e820 (patch)
treecf697e69d54e0ec0567fa4259b563c131bb1c185
parent3be5eff111e543e9702e64a34155420dbc1f2e25 (diff)
downloadKamon-37bf47d4ede655df5bda73882a2c15f98b39e820.tar.gz
Kamon-37bf47d4ede655df5bda73882a2c15f98b39e820.tar.bz2
Kamon-37bf47d4ede655df5bda73882a2c15f98b39e820.zip
+ log-reporter: include dispatcher metrics and close #163
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/ModuleLoader.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Metrics.scala9
-rw-r--r--kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala86
-rw-r--r--kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala4
5 files changed, 86 insertions, 21 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
index eb6b5293..4567d442 100644
--- a/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/DispatcherInstrumentation.scala
@@ -62,13 +62,13 @@ class DispatcherInstrumentation {
val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category)
if (Kamon.metrics.shouldTrack(dispatcherEntity))
- Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName)
+ Kamon.metrics.entity(ForkJoinPoolDispatcherMetrics.factory(fjp), dispatcherName, Map("dispatcher-type" -> "fork-join-pool"))
case tpe: ThreadPoolExecutor ⇒
val dispatcherEntity = Entity(dispatcherName, AkkaDispatcherMetrics.Category)
if (Kamon.metrics.shouldTrack(dispatcherEntity))
- Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName)
+ Kamon.metrics.entity(ThreadPoolExecutorDispatcherMetrics.factory(tpe), dispatcherName, Map("dispatcher-type" -> "thread-pool-executor"))
case others ⇒ // Currently not interested in other kinds of dispatchers.
}
diff --git a/kamon-core/src/main/scala/kamon/ModuleLoader.scala b/kamon-core/src/main/scala/kamon/ModuleLoader.scala
index 0cde49c3..d1b7466e 100644
--- a/kamon-core/src/main/scala/kamon/ModuleLoader.scala
+++ b/kamon-core/src/main/scala/kamon/ModuleLoader.scala
@@ -20,8 +20,7 @@ import _root_.akka.actor
import _root_.akka.actor._
import _root_.akka.event.Logging
import org.aspectj.lang.ProceedingJoinPoint
-import org.aspectj.lang.annotation.{Around, Pointcut, Aspect}
-
+import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect }
private[kamon] object ModuleLoader extends ExtensionId[ModuleLoaderExtension] with ExtensionIdProvider {
def lookup(): ExtensionId[_ <: actor.Extension] = ModuleLoader
@@ -32,7 +31,6 @@ private[kamon] class ModuleLoaderExtension(system: ExtendedActorSystem) extends
val log = Logging(system, "ModuleLoader")
val settings = ModuleLoaderSettings(system)
-
if (settings.modulesRequiringAspectJ.nonEmpty && !isAspectJPresent && settings.showAspectJMissingWarning)
logAspectJWeaverMissing(settings.modulesRequiringAspectJ)
diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
index 6a976a1c..58e16bbd 100644
--- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala
@@ -16,16 +16,15 @@
package kamon.metric
+import akka.actor._
import com.typesafe.config.Config
-import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe }
+import kamon.metric.SubscriptionsDispatcher.{ Subscribe, Unsubscribe }
import kamon.metric.instrument.Gauge.CurrentValueCollector
import kamon.metric.instrument.Histogram.DynamicRange
import kamon.metric.instrument._
+import kamon.util.LazyActorRef
import scala.collection.concurrent.TrieMap
-import akka.actor._
-import kamon.util.{ Supplier, LazyActorRef, TriemapAtomicGetOrElseUpdate }
-
import scala.concurrent.duration.FiniteDuration
case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T)
@@ -227,7 +226,7 @@ trait Metrics {
}
private[kamon] class MetricsImpl(config: Config) extends Metrics {
- import TriemapAtomicGetOrElseUpdate.Syntax
+ import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax
private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder]
private val _subscriptions = new LazyActorRef
diff --git a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala
index fc1f0faf..ad3e77f6 100644
--- a/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala
+++ b/kamon-log-reporter/src/main/scala/kamon/logreporter/LogReporter.scala
@@ -1,6 +1,6 @@
/*
* =========================================================================================
- * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -36,8 +36,9 @@ class LogReporterExtension(system: ExtendedActorSystem) extends Kamon.Extension
val subscriber = system.actorOf(Props[LogReporterSubscriber], "kamon-log-reporter")
Kamon.metrics.subscribe("trace", "**", subscriber, permanently = true)
- Kamon.metrics.subscribe("actor", "**", subscriber, permanently = true)
- Kamon.metrics.subscribe("user-metrics", "**", subscriber, permanently = true)
+ Kamon.metrics.subscribe("akka-actor", "**", subscriber, permanently = true)
+ Kamon.metrics.subscribe("akka-dispatcher", "**", subscriber, permanently = true)
+ Kamon.metrics.subscribe("metrics", "**", subscriber, permanently = true)
val includeSystemMetrics = logReporterConfig.getBoolean("report-system-metrics")
if (includeSystemMetrics) {
@@ -47,6 +48,7 @@ class LogReporterExtension(system: ExtendedActorSystem) extends Kamon.Extension
}
class LogReporterSubscriber extends Actor with ActorLogging {
+
import kamon.logreporter.LogReporterSubscriber.RichHistogramSnapshot
def receive = {
@@ -55,11 +57,12 @@ class LogReporterSubscriber extends Actor with ActorLogging {
def printMetricSnapshot(tick: TickMetricSnapshot): Unit = {
tick.metrics foreach {
- case (entity, snapshot) if entity.category == "actor" ⇒ logActorMetrics(entity.name, snapshot)
- case (entity, snapshot) if entity.category == "trace" ⇒ logTraceMetrics(entity.name, snapshot)
- case (entity, snapshot) if entity.category == "simple-metric" ⇒ logSimpleMetrics(snapshot)
- case (entity, snapshot) if entity.category == "system-metric" ⇒ logSystemMetrics(entity.name, snapshot)
- case ignoreEverythingElse ⇒
+ case (entity, snapshot) if entity.category == "akka-actor" ⇒ logActorMetrics(entity.name, snapshot)
+ case (entity, snapshot) if entity.category == "akka-dispatcher" ⇒ logDispatcherMetrics(entity, snapshot)
+ case (entity, snapshot) if entity.category == "trace" ⇒ logTraceMetrics(entity.name, snapshot)
+ case (entity, snapshot) if entity.category == "metrics" ⇒ logMetrics(snapshot)
+ case (entity, snapshot) if entity.category == "system-metric" ⇒ logSystemMetrics(entity.name, snapshot)
+ case ignoreEverythingElse ⇒
}
}
@@ -102,6 +105,71 @@ class LogReporterSubscriber extends Actor with ActorLogging {
}
+ def logDispatcherMetrics(entity: Entity, snapshot: EntitySnapshot): Unit = entity.tags.get("dispatcher-type") match {
+ case Some("fork-join-pool") ⇒ logForkJoinPool(entity.name, snapshot)
+ case Some("thread-pool-executor") ⇒ logThreadPoolExecutor(entity.name, snapshot)
+ case ignoreOthers ⇒
+ }
+
+ def logForkJoinPool(name: String, forkJoinMetrics: EntitySnapshot): Unit = {
+ for {
+ paralellism ← forkJoinMetrics.minMaxCounter("parallelism")
+ poolSize ← forkJoinMetrics.gauge("pool-size")
+ activeThreads ← forkJoinMetrics.gauge("active-threads")
+ runningThreads ← forkJoinMetrics.gauge("running-threads")
+ queuedTaskCount ← forkJoinMetrics.gauge("queued-task-count")
+
+ } {
+ log.info(
+ """
+ |+--------------------------------------------------------------------------------------------------+
+ || Fork-Join-Pool |
+ || |
+ || Dispatcher: %-83s |
+ || |
+ || Paralellism Pool Size Active Threads Running Threads Queue Task Count |
+ || Min %-4s %-4s %-4s %-4s %-4s |
+ || Avg %-4s %-4s %-4s %-4s %-4s |
+ || Max %-4s %-4s %-4s %-4s %-4s |
+ || |
+ |+--------------------------------------------------------------------------------------------------+"""
+ .stripMargin.format(name,
+ paralellism.min, poolSize.min, activeThreads.min, runningThreads.min, queuedTaskCount.min,
+ paralellism.average, poolSize.average, activeThreads.average, runningThreads.average, queuedTaskCount.average,
+ paralellism.max, poolSize.max, activeThreads.max, runningThreads.max, queuedTaskCount.max))
+ }
+ }
+
+ def logThreadPoolExecutor(name: String, threadPoolMetrics: EntitySnapshot): Unit = {
+ for {
+ corePoolSize ← threadPoolMetrics.gauge("core-pool-size")
+ maxPoolSize ← threadPoolMetrics.gauge("max-pool-size")
+ poolSize ← threadPoolMetrics.gauge("pool-size")
+ activeThreads ← threadPoolMetrics.gauge("active-threads")
+ processedTasks ← threadPoolMetrics.gauge("processed-tasks")
+ } {
+
+ log.info(
+ """
+ |+--------------------------------------------------------------------------------------------------+
+ || Thread-Pool-Executor |
+ || |
+ || Dispatcher: %-83s |
+ || |
+ || Core Pool Size Max Pool Size Pool Size Active Threads Processed Task |
+ || Min %-4s %-4s %-4s %-4s %-4s |
+ || Avg %-4s %-4s %-4s %-4s %-4s |
+ || Max %-4s %-4s %-4s %-4s %-4s |
+ || |
+ |+--------------------------------------------------------------------------------------------------+"""
+ .stripMargin.format(name,
+ corePoolSize.min, maxPoolSize.min, poolSize.min, activeThreads.min, processedTasks.min,
+ corePoolSize.average, maxPoolSize.average, poolSize.average, activeThreads.average, processedTasks.average,
+ corePoolSize.max, maxPoolSize.max, poolSize.max, activeThreads.max, processedTasks.max))
+ }
+
+ }
+
def logSystemMetrics(metric: String, snapshot: EntitySnapshot): Unit = metric match {
case "cpu" ⇒ logCpuMetrics(snapshot)
case "network" ⇒ logNetworkMetrics(snapshot)
@@ -253,7 +321,7 @@ class LogReporterSubscriber extends Actor with ActorLogging {
}
}
- def logSimpleMetrics(simpleMetrics: EntitySnapshot): Unit = {
+ def logMetrics(simpleMetrics: EntitySnapshot): Unit = {
val histograms = simpleMetrics.histograms
val minMaxCounters = simpleMetrics.minMaxCounters
val gauges = simpleMetrics.gauges
diff --git a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala
index 630cc7a2..94c6fefa 100644
--- a/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala
+++ b/kamon-system-metrics/src/main/scala/kamon/system/SystemMetricsExtension.scala
@@ -67,7 +67,7 @@ class SystemMetricsSupervisor(sigarRefreshInterval: FiniteDuration, contextSwitc
if (isLinux) {
val contextSwitchesRecorder = ContextSwitchesMetrics.register(context.system, contextSwitchesRefreshInterval)
context.actorOf(ContextSwitchesUpdater.props(contextSwitchesRecorder, sigarRefreshInterval)
- .withDispatcher("kamon.system-metrics.context-switches-dispatcher"), "context-switches-metrics-recorder")
+ .withDispatcher("kamon.system-metrics.context-switches-dispatcher"), "context-switches-metrics-recorder")
}
def isLinux: Boolean =
@@ -76,7 +76,7 @@ class SystemMetricsSupervisor(sigarRefreshInterval: FiniteDuration, contextSwitc
def receive = Actor.emptyBehavior
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
- case anyException => Restart
+ case anyException ⇒ Restart
}
}