diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala | 62 |
1 files changed, 55 insertions, 7 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala index 235f5143..f6b68617 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -18,9 +18,11 @@ package akka.instrumentation import akka.actor._ import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell import kamon.Kamon import kamon.metric.ActorMetrics.ActorMetricsRecorder -import kamon.metric.{ ActorMetrics, Metrics } +import kamon.metric.RouterMetrics.RouterMetricsRecorder +import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics } import kamon.trace._ import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -28,6 +30,8 @@ import org.aspectj.lang.annotation._ @Aspect class ActorCellInstrumentation { + import ActorCellInstrumentation.PimpedActorCellMetrics + @Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)") def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {} @@ -38,8 +42,14 @@ class ActorCellInstrumentation { val metricIdentity = ActorMetrics(ref.path.elements.mkString("/")) val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] - cellWithMetrics.metricIdentity = metricIdentity + cellWithMetrics.actorMetricIdentity = metricIdentity cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory) + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}") + routedActorCell.routerMetricIdentity = routerMetricIdentity + routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory) + } } @Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)") @@ -47,9 +57,9 @@ class ActorCellInstrumentation { @Around("invokingActorBehaviourAtActorCell(cell, envelope)") def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = { + val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] val timestampBeforeProcessing = System.nanoTime() val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware] - val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] try { TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { @@ -58,9 +68,23 @@ class ActorCellInstrumentation { } finally { cellWithMetrics.actorMetricsRecorder.map { am ⇒ - am.processingTime.record(System.nanoTime() - timestampBeforeProcessing) - am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime) + val processingTime = System.nanoTime() - timestampBeforeProcessing + val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime + + am.processingTime.record(processingTime) + am.timeInMailbox.record(timeInMailbox) am.mailboxSize.decrement() + + (processingTime, timeInMailbox) + } map { + case (processingTime, timeInMailbox) ⇒ + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { + rm ⇒ + rm.processingTime.record(processingTime) + rm.timeInMailbox.record(timeInMailbox) + } + } } } } @@ -82,7 +106,13 @@ class ActorCellInstrumentation { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] cellWithMetrics.actorMetricsRecorder.map { p ⇒ - Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity) + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity) + } + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { rm ⇒ + Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity) + } } } @@ -96,12 +126,21 @@ class ActorCellInstrumentation { cellWithMetrics.actorMetricsRecorder.map { am ⇒ am.errors.increment() } + + cellWithMetrics.onRoutedActorCell { routedActorCell ⇒ + routedActorCell.routerMetricsRecorder.map { + rm ⇒ rm.errors.increment() + } + } } + } trait ActorCellMetrics { - var metricIdentity: ActorMetrics = _ + var actorMetricIdentity: ActorMetrics = _ + var routerMetricIdentity: RouterMetrics = _ var actorMetricsRecorder: Option[ActorMetricsRecorder] = _ + var routerMetricsRecorder: Option[RouterMetricsRecorder] = _ } @Aspect @@ -125,4 +164,13 @@ class TraceContextIntoEnvelopeMixin { // Necessary to force the initialization of ContextAware at the moment of creation. ctx.traceContext } +} + +object ActorCellInstrumentation { + implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) { + def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match { + case routedActorCell: RoutedActorCell ⇒ block(cell) + case everyThingElse ⇒ + } + } }
\ No newline at end of file |