aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala')
-rw-r--r--kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala128
1 files changed, 128 insertions, 0 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
new file mode 100644
index 00000000..6bbefc6f
--- /dev/null
+++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
@@ -0,0 +1,128 @@
+package akka.kamon.instrumentation
+
+import akka.actor.{ Cell, ActorRef, ActorSystem }
+import akka.kamon.instrumentation.ActorMonitors.{ TrackedRoutee, TrackedActor }
+import kamon.Kamon
+import kamon.akka.{ RouterMetrics, ActorMetrics }
+import kamon.metric.Entity
+import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer }
+import kamon.util.RelativeNanoTimestamp
+import org.aspectj.lang.ProceedingJoinPoint
+
+trait ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef
+ def processFailure(failure: Throwable): Unit
+ def cleanup(): Unit
+}
+
+object ActorMonitor {
+
+ def createActorMonitor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): ActorMonitor = {
+ val cellInfo = CellInfo.cellInfoFor(cell, system, ref, parent)
+
+ if (cellInfo.isRouter)
+ ActorMonitors.NoOp
+ else {
+ if (cellInfo.isRoutee)
+ createRouteeMonitor(cellInfo)
+ else
+ createRegularActorMonitor(cellInfo)
+ }
+ }
+
+ def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = {
+ def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity)
+
+ if (cellInfo.isTracked)
+ new TrackedActor(cellInfo.entity, actorMetrics)
+ else ActorMonitors.ContextPropagationOnly
+ }
+
+ def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = {
+ def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity)
+
+ if (cellInfo.isTracked)
+ new TrackedRoutee(cellInfo.entity, routerMetrics)
+ else ActorMonitors.ContextPropagationOnly
+ }
+}
+
+object ActorMonitors {
+ val NoOp = new ActorMonitor {
+ override def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(RelativeNanoTimestamp.zero, EmptyTraceContext)
+ override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
+ override def processFailure(failure: Throwable): Unit = {}
+ override def cleanup(): Unit = {}
+ }
+
+ val ContextPropagationOnly = new ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext =
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
+
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
+ }
+
+ def processFailure(failure: Throwable): Unit = {}
+ def cleanup(): Unit = {}
+
+ }
+
+ class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext = {
+ actorMetrics.mailboxSize.increment()
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
+ }
+
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ val timestampBeforeProcessing = RelativeNanoTimestamp.now
+
+ try {
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
+
+ } finally {
+ val timestampAfterProcessing = RelativeNanoTimestamp.now
+ val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
+ val processingTime = timestampAfterProcessing - timestampBeforeProcessing
+
+ actorMetrics.processingTime.record(processingTime.nanos)
+ actorMetrics.timeInMailbox.record(timeInMailbox.nanos)
+ actorMetrics.mailboxSize.decrement()
+ }
+ }
+
+ def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
+ def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
+ }
+
+ class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor {
+ def captureEnvelopeContext(): EnvelopeContext =
+ EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
+
+ def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
+ val timestampBeforeProcessing = RelativeNanoTimestamp.now
+
+ try {
+ Tracer.withContext(envelopeContext.context) {
+ pjp.proceed()
+ }
+
+ } finally {
+ val timestampAfterProcessing = RelativeNanoTimestamp.now
+ val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
+ val processingTime = timestampAfterProcessing - timestampBeforeProcessing
+
+ routerMetrics.processingTime.record(processingTime.nanos)
+ routerMetrics.timeInMailbox.record(timeInMailbox.nanos)
+ }
+ }
+
+ def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
+ def cleanup(): Unit = {}
+ }
+} \ No newline at end of file