1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
package kamon.instrumentation.akka
import akka.actor.SupervisorStrategy.{ Escalate, Restart, Resume, Stop }
import akka.actor._
import akka.testkit.{ ImplicitSender, TestKit }
import kamon.trace.TraceRecorder
import org.scalatest.WordSpecLike
import scala.concurrent.duration._
import scala.util.control.NonFatal
class ActorSystemMessageInstrumentationSpec extends TestKit(ActorSystem("actor-system-message-instrumentation-spec"))
with WordSpecLike with ImplicitSender {
implicit val executionContext = system.dispatcher
"the system message passing instrumentation" should {
"keep the TraceContext while processing the Create message in top level actors" in {
val testTraceContext = TraceRecorder.withNewTraceContext("creating-top-level-actor") {
system.actorOf(Props(new Actor {
testActor ! TraceRecorder.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
"keep the TraceContext while processing the Create message in non top level actors" in {
val testTraceContext = TraceRecorder.withNewTraceContext("creating-non-top-level-actor") {
system.actorOf(Props(new Actor {
def receive: Actor.Receive = {
case any ⇒
context.actorOf(Props(new Actor {
testActor ! TraceRecorder.currentContext
def receive: Actor.Receive = { case any ⇒ }
}))
}
})) ! "any"
TraceRecorder.currentContext
}
expectMsg(testTraceContext)
}
"keep the TraceContext in the supervision cycle" when {
"the actor is resumed" in {
val supervisor = supervisorWithDirective(Resume)
val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-resume") {
supervisor ! "fail"
TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
// Ensure we didn't tie the actor with the context
supervisor ! "context"
expectMsg(None)
}
"the actor is restarted" in {
val supervisor = supervisorWithDirective(Restart, sendPreRestart = true, sendPostRestart = true)
val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-restart") {
supervisor ! "fail"
TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
expectMsg(testTraceContext) // From the preRestart hook
expectMsg(testTraceContext) // From the postRestart hook
// Ensure we didn't tie the actor with the context
supervisor ! "context"
expectMsg(None)
}
"the actor is stopped" in {
val supervisor = supervisorWithDirective(Stop, sendPostStop = true)
val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-stop") {
supervisor ! "fail"
TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
expectMsg(testTraceContext) // From the postStop hook
expectNoMsg(1 second)
}
"the failure is escalated" in {
val supervisor = supervisorWithDirective(Escalate, sendPostStop = true)
val testTraceContext = TraceRecorder.withNewTraceContext("fail-and-escalate") {
supervisor ! "fail"
TraceRecorder.currentContext
}
expectMsg(testTraceContext) // From the parent executing the supervision strategy
expectMsg(testTraceContext) // From the grandparent executing the supervision strategy
expectMsg(testTraceContext) // From the postStop hook in the child
expectMsg(testTraceContext) // From the postStop hook in the parent
expectNoMsg(1 second)
}
}
}
def supervisorWithDirective(directive: SupervisorStrategy.Directive, sendPreRestart: Boolean = false, sendPostRestart: Boolean = false,
sendPostStop: Boolean = false, sendPreStart: Boolean = false): ActorRef = {
class GrandParent extends Actor {
val child = context.actorOf(Props(new Parent))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; Stop
}
def receive = {
case any ⇒ child forward any
}
}
class Parent extends Actor {
val child = context.actorOf(Props(new Child))
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case NonFatal(throwable) ⇒ testActor ! TraceRecorder.currentContext; directive
}
def receive: Actor.Receive = {
case any ⇒ child forward any
}
override def postStop(): Unit = {
if (sendPostStop) testActor ! TraceRecorder.currentContext
super.postStop()
}
}
class Child extends Actor {
def receive = {
case "fail" ⇒ 1 / 0
case "context" ⇒ sender ! TraceRecorder.currentContext
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
if (sendPreRestart) testActor ! TraceRecorder.currentContext
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
if (sendPostRestart) testActor ! TraceRecorder.currentContext
super.postRestart(reason)
}
override def postStop(): Unit = {
if (sendPostStop) testActor ! TraceRecorder.currentContext
super.postStop()
}
override def preStart(): Unit = {
if (sendPreStart) testActor ! TraceRecorder.currentContext
super.preStart()
}
}
system.actorOf(Props(new GrandParent))
}
}
|