From ace4d9d09097aa9b7e032d5bfbe154dcf73f74ed Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 18 May 2014 21:38:55 -0300 Subject: = core: move the scheduling of gauge recordings to MetricsExtension and load interval for recordings from config --- kamon-core/src/main/resources/reference.conf | 45 +++++++++------------- .../ActorMessagePassingTracing.scala | 18 +++++---- .../main/scala/kamon/metrics/ActorMetrics.scala | 2 +- .../scala/kamon/metrics/MetricsExtension.scala | 25 ++++++++++-- .../main/scala/kamon/metrics/TraceMetrics.scala | 2 +- 5 files changed, 52 insertions(+), 40 deletions(-) diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index e5168929..b1e5309d 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -4,8 +4,24 @@ kamon { metrics { + + # Time interval at which Kamon will collect all metrics and send them to all subscribed actors. tick-interval = 1 second + # Time interval at which Kamon will record values for all registered gauges. + gauge-recording-interval = 100 milliseconds + + dispatchers { + + # All Gauges record values periodically according to the `kamon.metrics.gauge-recording-interval` setting. + # This dispatcher is the one to be used to execute the recording code. + gauge-recordings = "akka.actor.default-dispatcher" + + # Dispatcher for the actor managing all subscriptions and metrics collection. + metric-subscriptions = "akka.actor.default-dispatcher" + } + + filters = [ { actor { @@ -56,34 +72,8 @@ kamon { } } - default-dispatcher { - # Dispatcher is the name of the event-based dispatcher - type = Dispatcher - - # What kind of ExecutionService to use - executor = "fork-join-executor" - - # Configuration for the fork join pool - fork-join-executor { - - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - - # Parallelism (threads) ... ceil(available processors * factor) - parallelism-factor = 2.0 - - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 10 - } - - # Throughput defines the maximum number of messages to be - # processed per actor before the thread jumps to the next actor. - # Set to 1 for as fair as possible. - throughput = 100 - } - - trace { + # If ask-pattern-tracing is enabled, a WARN level log message will be generated if a future generated by the `ask` # pattern fails with a `AskTimeoutException` and the log message will contain a stack trace captured at the moment # the future was created. @@ -91,6 +81,7 @@ kamon { } weaver { + # AspectJ options supported by LTW # showWeaveInfo: show informational messages whenever the weaver touches a class file. # verbose: show informational messages about the weaving process. diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index d002c574..20bfe564 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -25,7 +25,6 @@ import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon import kamon.metrics.ActorMetrics.ActorMetricRecorder import kamon.metrics.instruments.counter.MinMaxCounter -import kamon.util.Contexts @Aspect class BehaviourInvokeTracing { @@ -35,7 +34,6 @@ class BehaviourInvokeTracing { @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - import scala.concurrent.duration._ val metricsExtension = Kamon(Metrics)(system) val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) @@ -44,17 +42,18 @@ class BehaviourInvokeTracing { cellWithMetrics.metricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - system.scheduler.schedule(0 milliseconds, 100 milliseconds) { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ + if (cellWithMetrics.actorMetricsRecorder.isDefined) { + cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder { + cellWithMetrics.actorMetricsRecorder.map { am ⇒ import am.mailboxSize._ val (min, max, sum) = cellWithMetrics.queueSize.collect() record(min) record(max) record(sum) + } } - }(metricsExtension.defaultDispatcher) + } } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -95,7 +94,11 @@ class BehaviourInvokeTracing { @After("actorStop(cell)") def afterStop(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(p ⇒ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)) + + cellWithMetrics.actorMetricsRecorder.map { p ⇒ + cellWithMetrics.mailboxSizeCollectorCancellable.cancel() + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) + } } @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") @@ -114,6 +117,7 @@ class BehaviourInvokeTracing { trait ActorCellMetrics { var metricIdentity: ActorMetrics = _ var actorMetricsRecorder: Option[ActorMetricRecorder] = _ + var mailboxSizeCollectorCancellable: Cancellable = _ val queueSize = MinMaxCounter() } diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala index c703d589..44dd84b0 100644 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/ActorMetrics.scala @@ -54,7 +54,7 @@ object ActorMetrics extends MetricGroupCategory { type GroupRecorder = ActorMetricRecorder def create(config: Config): ActorMetricRecorder = { - val settings = config.getConfig("kamon.metrics.precision.actor") + val settings = config.getConfig("precision.actor") val processingTimeConfig = extractPrecisionConfig(settings.getConfig("processing-time")) val mailboxSizeConfig = extractPrecisionConfig(settings.getConfig("mailbox-size")) diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala index 9a08da71..78a82d96 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala @@ -24,16 +24,25 @@ import kamon.Kamon import akka.actor import kamon.metrics.Metrics.MetricGroupFilter import kamon.metrics.Subscriptions.Subscribe +import java.util.concurrent.TimeUnit class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - val config = system.settings.config + val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") + + /** Configured Dispatchers */ + val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) + val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) + + /** Configuration Settings */ + val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) + val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(config) + val filters = loadFilters(metricsExtConfig) lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { if (shouldTrack(identity)) - Some(storage.getOrElseUpdate(identity, factory.create(config)).asInstanceOf[factory.GroupRecorder]) + Some(storage.getOrElseUpdate(identity, factory.create(metricsExtConfig)).asInstanceOf[factory.GroupRecorder]) else None } @@ -50,6 +59,14 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap } + def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { + import scala.concurrent.duration._ + + system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { + body + }(gaugeRecordingsDispatcher) + } + private def shouldTrack(identity: MetricGroupIdentity): Boolean = { filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(false) } @@ -57,7 +74,7 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { def loadFilters(config: Config): Map[String, MetricGroupFilter] = { import scala.collection.JavaConverters._ - val filters = config.getObjectList("kamon.metrics.filters").asScala + val filters = config.getObjectList("filters").asScala val allFilters = for ( diff --git a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala index 7c197166..5454edf5 100644 --- a/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metrics/TraceMetrics.scala @@ -53,7 +53,7 @@ object TraceMetrics extends MetricGroupCategory { def create(config: Config): TraceMetricRecorder = { - val settings = config.getConfig("kamon.metrics.precision.trace") + val settings = config.getConfig("precision.trace") val elapsedTimeConfig = extractPrecisionConfig(settings.getConfig("elapsed-time")) val segmentConfig = extractPrecisionConfig(settings.getConfig("segment")) -- cgit v1.2.3