From b18d05149881e9eb141f9f4878916f37c5b52247 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 18 May 2014 21:38:55 -0300 Subject: = core: move the scheduling of gauge recordings to MetricsExtension and load interval for recordings from config --- .../instrumentation/ActorMessagePassingTracing.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) (limited to 'kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala') diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala index d002c574..20bfe564 100644 --- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala @@ -25,7 +25,6 @@ import kamon.metrics.{ ActorMetrics, Metrics } import kamon.Kamon import kamon.metrics.ActorMetrics.ActorMetricRecorder import kamon.metrics.instruments.counter.MinMaxCounter -import kamon.util.Contexts @Aspect class BehaviourInvokeTracing { @@ -35,7 +34,6 @@ class BehaviourInvokeTracing { @After("actorCellCreation(cell, system, ref, props, dispatcher, parent)") def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = { - import scala.concurrent.duration._ val metricsExtension = Kamon(Metrics)(system) val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) @@ -44,17 +42,18 @@ class BehaviourInvokeTracing { cellWithMetrics.metricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) - system.scheduler.schedule(0 milliseconds, 100 milliseconds) { - cellWithMetrics.actorMetricsRecorder.map { - am ⇒ + if (cellWithMetrics.actorMetricsRecorder.isDefined) { + cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder { + cellWithMetrics.actorMetricsRecorder.map { am ⇒ import am.mailboxSize._ val (min, max, sum) = cellWithMetrics.queueSize.collect() record(min) record(max) record(sum) + } } - }(metricsExtension.defaultDispatcher) + } } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -95,7 +94,11 @@ class BehaviourInvokeTracing { @After("actorStop(cell)") def afterStop(cell: ActorCell): Unit = { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.actorMetricsRecorder.map(p ⇒ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)) + + cellWithMetrics.actorMetricsRecorder.map { p ⇒ + cellWithMetrics.mailboxSizeCollectorCancellable.cancel() + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) + } } @Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)") @@ -114,6 +117,7 @@ class BehaviourInvokeTracing { trait ActorCellMetrics { var metricIdentity: ActorMetrics = _ var actorMetricsRecorder: Option[ActorMetricRecorder] = _ + var mailboxSizeCollectorCancellable: Cancellable = _ val queueSize = MinMaxCounter() } -- cgit v1.2.3