aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala')
-rw-r--r--kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala18
1 files changed, 11 insertions, 7 deletions
diff --git a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
index d002c574..20bfe564 100644
--- a/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
+++ b/kamon-core/src/main/scala/akka/instrumentation/ActorMessagePassingTracing.scala
@@ -25,7 +25,6 @@ import kamon.metrics.{ ActorMetrics, Metrics }
import kamon.Kamon
import kamon.metrics.ActorMetrics.ActorMetricRecorder
import kamon.metrics.instruments.counter.MinMaxCounter
-import kamon.util.Contexts
@Aspect
class BehaviourInvokeTracing {
@@ -35,7 +34,6 @@ class BehaviourInvokeTracing {
@After("actorCellCreation(cell, system, ref, props, dispatcher, parent)")
def afterCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
- import scala.concurrent.duration._
val metricsExtension = Kamon(Metrics)(system)
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
@@ -44,17 +42,18 @@ class BehaviourInvokeTracing {
cellWithMetrics.metricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
- system.scheduler.schedule(0 milliseconds, 100 milliseconds) {
- cellWithMetrics.actorMetricsRecorder.map {
- am ⇒
+ if (cellWithMetrics.actorMetricsRecorder.isDefined) {
+ cellWithMetrics.mailboxSizeCollectorCancellable = metricsExtension.scheduleGaugeRecorder {
+ cellWithMetrics.actorMetricsRecorder.map { am ⇒
import am.mailboxSize._
val (min, max, sum) = cellWithMetrics.queueSize.collect()
record(min)
record(max)
record(sum)
+ }
}
- }(metricsExtension.defaultDispatcher)
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -95,7 +94,11 @@ class BehaviourInvokeTracing {
@After("actorStop(cell)")
def afterStop(cell: ActorCell): Unit = {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.actorMetricsRecorder.map(p ⇒ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity))
+
+ cellWithMetrics.actorMetricsRecorder.map { p ⇒
+ cellWithMetrics.mailboxSizeCollectorCancellable.cancel()
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ }
}
@Pointcut("execution(* akka.actor.ActorCell.handleInvokeFailure(..)) && this(cell)")
@@ -114,6 +117,7 @@ class BehaviourInvokeTracing {
trait ActorCellMetrics {
var metricIdentity: ActorMetrics = _
var actorMetricsRecorder: Option[ActorMetricRecorder] = _
+ var mailboxSizeCollectorCancellable: Cancellable = _
val queueSize = MinMaxCounter()
}