aboutsummaryrefslogtreecommitdiff
path: root/kamon-akka/src/main/scala/kamon/akka/instrumentation/ActorMonitor.scala
blob: 6bbefc6fd2c7e38d57ef5552b00f7e008f6e9a26 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package akka.kamon.instrumentation

import akka.actor.{ Cell, ActorRef, ActorSystem }
import akka.kamon.instrumentation.ActorMonitors.{ TrackedRoutee, TrackedActor }
import kamon.Kamon
import kamon.akka.{ RouterMetrics, ActorMetrics }
import kamon.metric.Entity
import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer }
import kamon.util.RelativeNanoTimestamp
import org.aspectj.lang.ProceedingJoinPoint

trait ActorMonitor {
  def captureEnvelopeContext(): EnvelopeContext
  def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef
  def processFailure(failure: Throwable): Unit
  def cleanup(): Unit
}

object ActorMonitor {

  def createActorMonitor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): ActorMonitor = {
    val cellInfo = CellInfo.cellInfoFor(cell, system, ref, parent)

    if (cellInfo.isRouter)
      ActorMonitors.NoOp
    else {
      if (cellInfo.isRoutee)
        createRouteeMonitor(cellInfo)
      else
        createRegularActorMonitor(cellInfo)
    }
  }

  def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = {
    def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity)

    if (cellInfo.isTracked)
      new TrackedActor(cellInfo.entity, actorMetrics)
    else ActorMonitors.ContextPropagationOnly
  }

  def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = {
    def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity)

    if (cellInfo.isTracked)
      new TrackedRoutee(cellInfo.entity, routerMetrics)
    else ActorMonitors.ContextPropagationOnly
  }
}

object ActorMonitors {
  val NoOp = new ActorMonitor {
    override def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(RelativeNanoTimestamp.zero, EmptyTraceContext)
    override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
    override def processFailure(failure: Throwable): Unit = {}
    override def cleanup(): Unit = {}
  }

  val ContextPropagationOnly = new ActorMonitor {
    def captureEnvelopeContext(): EnvelopeContext =
      EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)

    def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
      Tracer.withContext(envelopeContext.context) {
        pjp.proceed()
      }
    }

    def processFailure(failure: Throwable): Unit = {}
    def cleanup(): Unit = {}

  }

  class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor {
    def captureEnvelopeContext(): EnvelopeContext = {
      actorMetrics.mailboxSize.increment()
      EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
    }

    def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
      val timestampBeforeProcessing = RelativeNanoTimestamp.now

      try {
        Tracer.withContext(envelopeContext.context) {
          pjp.proceed()
        }

      } finally {
        val timestampAfterProcessing = RelativeNanoTimestamp.now
        val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
        val processingTime = timestampAfterProcessing - timestampBeforeProcessing

        actorMetrics.processingTime.record(processingTime.nanos)
        actorMetrics.timeInMailbox.record(timeInMailbox.nanos)
        actorMetrics.mailboxSize.decrement()
      }
    }

    def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
    def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
  }

  class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor {
    def captureEnvelopeContext(): EnvelopeContext =
      EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)

    def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
      val timestampBeforeProcessing = RelativeNanoTimestamp.now

      try {
        Tracer.withContext(envelopeContext.context) {
          pjp.proceed()
        }

      } finally {
        val timestampAfterProcessing = RelativeNanoTimestamp.now
        val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
        val processingTime = timestampAfterProcessing - timestampBeforeProcessing

        routerMetrics.processingTime.record(processingTime.nanos)
        routerMetrics.timeInMailbox.record(timeInMailbox.nanos)
      }
    }

    def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
    def cleanup(): Unit = {}
  }
}