aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala62
1 files changed, 55 insertions, 7 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
index 235f5143..f6b68617 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala
@@ -18,9 +18,11 @@ package akka.instrumentation
import akka.actor._
import akka.dispatch.{ Envelope, MessageDispatcher }
+import akka.routing.RoutedActorCell
import kamon.Kamon
import kamon.metric.ActorMetrics.ActorMetricsRecorder
-import kamon.metric.{ ActorMetrics, Metrics }
+import kamon.metric.RouterMetrics.RouterMetricsRecorder
+import kamon.metric.{ ActorMetrics, Metrics, RouterMetrics }
import kamon.trace._
import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation._
@@ -28,6 +30,8 @@ import org.aspectj.lang.annotation._
@Aspect
class ActorCellInstrumentation {
+ import ActorCellInstrumentation.PimpedActorCellMetrics
+
@Pointcut("execution(akka.actor.ActorCell.new(..)) && this(cell) && args(system, ref, props, dispatcher, parent)")
def actorCellCreation(cell: ActorCell, system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}
@@ -38,8 +42,14 @@ class ActorCellInstrumentation {
val metricIdentity = ActorMetrics(ref.path.elements.mkString("/"))
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
- cellWithMetrics.metricIdentity = metricIdentity
+ cellWithMetrics.actorMetricIdentity = metricIdentity
cellWithMetrics.actorMetricsRecorder = metricsExtension.register(metricIdentity, ActorMetrics.Factory)
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ val routerMetricIdentity = RouterMetrics(s"${routedActorCell.asInstanceOf[RoutedActorCell].self.path.elements.mkString("/")}")
+ routedActorCell.routerMetricIdentity = routerMetricIdentity
+ routedActorCell.routerMetricsRecorder = metricsExtension.register(routerMetricIdentity, RouterMetrics.Factory)
+ }
}
@Pointcut("(execution(* akka.actor.ActorCell.invoke(*)) || execution(* akka.routing.RoutedActorCell.sendMessage(*))) && this(cell) && args(envelope)")
@@ -47,9 +57,9 @@ class ActorCellInstrumentation {
@Around("invokingActorBehaviourAtActorCell(cell, envelope)")
def aroundBehaviourInvoke(pjp: ProceedingJoinPoint, cell: ActorCell, envelope: Envelope): Any = {
+ val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
val timestampBeforeProcessing = System.nanoTime()
val contextAndTimestamp = envelope.asInstanceOf[TraceContextAware]
- val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
try {
TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) {
@@ -58,9 +68,23 @@ class ActorCellInstrumentation {
} finally {
cellWithMetrics.actorMetricsRecorder.map {
am ⇒
- am.processingTime.record(System.nanoTime() - timestampBeforeProcessing)
- am.timeInMailbox.record(timestampBeforeProcessing - contextAndTimestamp.captureNanoTime)
+ val processingTime = System.nanoTime() - timestampBeforeProcessing
+ val timeInMailbox = timestampBeforeProcessing - contextAndTimestamp.captureNanoTime
+
+ am.processingTime.record(processingTime)
+ am.timeInMailbox.record(timeInMailbox)
am.mailboxSize.decrement()
+
+ (processingTime, timeInMailbox)
+ } map {
+ case (processingTime, timeInMailbox) ⇒
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map {
+ rm ⇒
+ rm.processingTime.record(processingTime)
+ rm.timeInMailbox.record(timeInMailbox)
+ }
+ }
}
}
}
@@ -82,7 +106,13 @@ class ActorCellInstrumentation {
val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics]
cellWithMetrics.actorMetricsRecorder.map { p ⇒
- Kamon(Metrics)(cell.system).unregister(cellWithMetrics.metricIdentity)
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.actorMetricIdentity)
+ }
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map { rm ⇒
+ Kamon(Metrics)(cell.system).unregister(cellWithMetrics.routerMetricIdentity)
+ }
}
}
@@ -96,12 +126,21 @@ class ActorCellInstrumentation {
cellWithMetrics.actorMetricsRecorder.map {
am ⇒ am.errors.increment()
}
+
+ cellWithMetrics.onRoutedActorCell { routedActorCell ⇒
+ routedActorCell.routerMetricsRecorder.map {
+ rm ⇒ rm.errors.increment()
+ }
+ }
}
+
}
trait ActorCellMetrics {
- var metricIdentity: ActorMetrics = _
+ var actorMetricIdentity: ActorMetrics = _
+ var routerMetricIdentity: RouterMetrics = _
var actorMetricsRecorder: Option[ActorMetricsRecorder] = _
+ var routerMetricsRecorder: Option[RouterMetricsRecorder] = _
}
@Aspect
@@ -125,4 +164,13 @@ class TraceContextIntoEnvelopeMixin {
// Necessary to force the initialization of ContextAware at the moment of creation.
ctx.traceContext
}
+}
+
+object ActorCellInstrumentation {
+ implicit class PimpedActorCellMetrics(cell: ActorCellMetrics) {
+ def onRoutedActorCell(block: ActorCellMetrics ⇒ Unit) = cell match {
+ case routedActorCell: RoutedActorCell ⇒ block(cell)
+ case everyThingElse ⇒
+ }
+ }
} \ No newline at end of file