aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
blob: a2b920a2966e143b8c94bbf587794e723a1c32a4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package akka.kamon.instrumentation

import akka.actor.{ Cell, ActorRef, ActorSystem }
import akka.kamon.instrumentation.ActorMonitors.{ TrackedRouteeWithContextPropagation, TrackedRoutee, ContextPropagation, TrackedActor }
import kamon.Kamon
import kamon.akka.TraceContextPropagationSettings.{ Always, MonitoredActorsOnly, Off }
import kamon.akka.{ AkkaExtension, RouterMetrics, ActorMetrics }
import kamon.metric.Entity
import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer }
import kamon.util.RelativeNanoTimestamp
import org.aspectj.lang.ProceedingJoinPoint

trait ActorMonitor {
  def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(captureTimestamp, captureTraceContext)
  def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
  def processFailure(failure: Throwable): Unit = {}
  def cleanup(): Unit = {}

  protected def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.zero
  protected def captureTraceContext: TraceContext = EmptyTraceContext
}

object ActorMonitor {

  def createActorMonitor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): ActorMonitor = {
    val cellInfo = CellInfo.cellInfoFor(cell, system, ref, parent)

    if (cellInfo.isRouter)
      ActorMonitors.NoOp
    else {
      if (cellInfo.isRoutee)
        createRouteeMonitor(cellInfo)
      else
        createRegularActorMonitor(cellInfo)
    }
  }

  def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = {
    def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity)

    AkkaExtension.traceContextPropagation match {
      case Off if cellInfo.isTracked                  new TrackedActor(cellInfo.entity, actorMetrics)
      case Off                                        ActorMonitors.NoOp
      case MonitoredActorsOnly if cellInfo.isTracked  new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation
      case MonitoredActorsOnly                        ActorMonitors.NoOp
      case Always if cellInfo.isTracked               new TrackedActor(cellInfo.entity, actorMetrics) with ContextPropagation
      case Always                                     ActorMonitors.ContextPropagationOnly
    }
  }

  def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = {
    def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity)

    AkkaExtension.traceContextPropagation match {
      case Off if cellInfo.isTracked                  new TrackedRoutee(cellInfo.entity, routerMetrics)
      case Off                                        ActorMonitors.NoOp
      case MonitoredActorsOnly if cellInfo.isTracked  new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics)
      case MonitoredActorsOnly                        ActorMonitors.NoOp
      case Always if cellInfo.isTracked               new TrackedRouteeWithContextPropagation(cellInfo.entity, routerMetrics)
      case Always                                     ActorMonitors.ContextPropagationOnly
    }
  }
}

object ActorMonitors {
  val NoOp = new ActorMonitor {}
  val ContextPropagationOnly = new ActorMonitor with ContextPropagation

  class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor {
    override def captureEnvelopeContext(): EnvelopeContext = {
      actorMetrics.mailboxSize.increment()
      super.captureEnvelopeContext()
    }

    override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
      val timestampBeforeProcessing = RelativeNanoTimestamp.now

      try {
        super.processMessage(pjp, envelopeContext)

      } finally {
        val timestampAfterProcessing = RelativeNanoTimestamp.now
        val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
        val processingTime = timestampAfterProcessing - timestampBeforeProcessing

        actorMetrics.processingTime.record(processingTime.nanos)
        actorMetrics.timeInMailbox.record(timeInMailbox.nanos)
        actorMetrics.mailboxSize.decrement()
      }
    }

    override def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
    override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now
    override def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
  }

  class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor {

    override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
      val timestampBeforeProcessing = RelativeNanoTimestamp.now

      try {
        super.processMessage(pjp, envelopeContext)

      } finally {
        val timestampAfterProcessing = RelativeNanoTimestamp.now
        val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
        val processingTime = timestampAfterProcessing - timestampBeforeProcessing

        routerMetrics.processingTime.record(processingTime.nanos)
        routerMetrics.timeInMailbox.record(timeInMailbox.nanos)
      }
    }

    override def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
    override def captureTimestamp: RelativeNanoTimestamp = RelativeNanoTimestamp.now
    override def cleanup(): Unit = {}
  }

  trait ContextPropagation extends ActorMonitor {
    override protected def captureTraceContext: TraceContext = Tracer.currentContext

    override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
      Tracer.withContext(envelopeContext.context) {
        super.processMessage(pjp, envelopeContext)
      }
    }
  }

  class TrackedActorWithContextPropagation(entity: Entity, actorMetrics: ActorMetrics)
    extends TrackedActor(entity, actorMetrics) with ContextPropagation

  class TrackedRouteeWithContextPropagation(entity: Entity, routerMetrics: RouterMetrics)
    extends TrackedRoutee(entity, routerMetrics) with ContextPropagation
}