1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
package akka.kamon.instrumentation
import akka.actor.{ Cell, ActorRef, ActorSystem }
import akka.kamon.instrumentation.ActorMonitors.{ TrackedRoutee, TrackedActor }
import kamon.Kamon
import kamon.akka.{ RouterMetrics, ActorMetrics }
import kamon.metric.Entity
import kamon.trace.{ TraceContext, EmptyTraceContext, Tracer }
import kamon.util.RelativeNanoTimestamp
import org.aspectj.lang.ProceedingJoinPoint
trait ActorMonitor {
def captureEnvelopeContext(): EnvelopeContext
def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef
def processFailure(failure: Throwable): Unit
def cleanup(): Unit
}
object ActorMonitor {
def createActorMonitor(cell: Cell, system: ActorSystem, ref: ActorRef, parent: ActorRef): ActorMonitor = {
val cellInfo = CellInfo.cellInfoFor(cell, system, ref, parent)
if (cellInfo.isRouter)
ActorMonitors.NoOp
else {
if (cellInfo.isRoutee)
createRouteeMonitor(cellInfo)
else
createRegularActorMonitor(cellInfo)
}
}
def createRegularActorMonitor(cellInfo: CellInfo): ActorMonitor = {
def actorMetrics = Kamon.metrics.entity(ActorMetrics, cellInfo.entity)
if (cellInfo.isTracked)
new TrackedActor(cellInfo.entity, actorMetrics)
else ActorMonitors.ContextPropagationOnly
}
def createRouteeMonitor(cellInfo: CellInfo): ActorMonitor = {
def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity)
if (cellInfo.isTracked)
new TrackedRoutee(cellInfo.entity, routerMetrics)
else ActorMonitors.ContextPropagationOnly
}
}
object ActorMonitors {
val NoOp = new ActorMonitor {
override def captureEnvelopeContext(): EnvelopeContext = EnvelopeContext(RelativeNanoTimestamp.zero, EmptyTraceContext)
override def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = pjp.proceed()
override def processFailure(failure: Throwable): Unit = {}
override def cleanup(): Unit = {}
}
val ContextPropagationOnly = new ActorMonitor {
def captureEnvelopeContext(): EnvelopeContext =
EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
Tracer.withContext(envelopeContext.context) {
pjp.proceed()
}
}
def processFailure(failure: Throwable): Unit = {}
def cleanup(): Unit = {}
}
class TrackedActor(val entity: Entity, actorMetrics: ActorMetrics) extends ActorMonitor {
def captureEnvelopeContext(): EnvelopeContext = {
actorMetrics.mailboxSize.increment()
EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
}
def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
val timestampBeforeProcessing = RelativeNanoTimestamp.now
try {
Tracer.withContext(envelopeContext.context) {
pjp.proceed()
}
} finally {
val timestampAfterProcessing = RelativeNanoTimestamp.now
val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
val processingTime = timestampAfterProcessing - timestampBeforeProcessing
actorMetrics.processingTime.record(processingTime.nanos)
actorMetrics.timeInMailbox.record(timeInMailbox.nanos)
actorMetrics.mailboxSize.decrement()
}
}
def processFailure(failure: Throwable): Unit = actorMetrics.errors.increment()
def cleanup(): Unit = Kamon.metrics.removeEntity(entity)
}
class TrackedRoutee(val entity: Entity, routerMetrics: RouterMetrics) extends ActorMonitor {
def captureEnvelopeContext(): EnvelopeContext =
EnvelopeContext(RelativeNanoTimestamp.now, Tracer.currentContext)
def processMessage(pjp: ProceedingJoinPoint, envelopeContext: EnvelopeContext): AnyRef = {
val timestampBeforeProcessing = RelativeNanoTimestamp.now
try {
Tracer.withContext(envelopeContext.context) {
pjp.proceed()
}
} finally {
val timestampAfterProcessing = RelativeNanoTimestamp.now
val timeInMailbox = timestampBeforeProcessing - envelopeContext.nanoTime
val processingTime = timestampAfterProcessing - timestampBeforeProcessing
routerMetrics.processingTime.record(processingTime.nanos)
routerMetrics.timeInMailbox.record(timeInMailbox.nanos)
}
}
def processFailure(failure: Throwable): Unit = routerMetrics.errors.increment()
def cleanup(): Unit = {}
}
}
|