1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
/*
* =========================================================================================
* Copyright © 2013 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon.instrumentation.akka
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.event.Logging.Warning
import akka.pattern.ask
import akka.testkit.TestProbe
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import kamon.Kamon
import kamon.akka.Akka
import kamon.testkit.BaseKamonSpec
import kamon.trace.{Tracer, TraceContext, TraceContextAware}
import scala.concurrent.duration._
class AskPatternInstrumentationSpec extends BaseKamonSpec("ask-pattern-tracing-spec") {
override lazy val config =
ConfigFactory.parseString(
"""
|akka {
| loglevel = OFF
|}
""".stripMargin)
implicit lazy val ec = system.dispatcher
implicit val askTimeout = Timeout(10 millis)
// TODO: Make this work with ActorSelections
"the AskPatternInstrumentation" when {
"configured in heavyweight mode" should {
"log a warning with a full stack trace and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture {
setAskPatternTimeoutWarningMode("heavyweight")
expectTimeoutWarning() {
Tracer.withContext(newContext("ask-timeout-warning")) {
noReplyActorRef ? "hello"
Tracer.currentContext
}
}
}
}
"configured in lightweight mode" should {
"log a warning with a short source location description and the TraceContext taken from the moment the ask was triggered for a actor" in new NoReplyFixture {
setAskPatternTimeoutWarningMode("lightweight")
expectTimeoutWarning(messageSizeLimit = Some(1)) {
Tracer.withContext(newContext("ask-timeout-warning")) {
noReplyActorRef ? "hello"
Tracer.currentContext
}
}
}
}
"configured in off mode" should {
"should not log any warning messages" in new NoReplyFixture {
setAskPatternTimeoutWarningMode("off")
expectTimeoutWarning(expectWarning = false) {
Tracer.withContext(newContext("ask-timeout-warning")) {
noReplyActorRef ? "hello"
Tracer.currentContext
}
}
}
}
}
override protected def afterAll(): Unit = shutdown()
def expectTimeoutWarning(messageSizeLimit: Option[Int] = None, expectWarning: Boolean = true)(thunk: ⇒ TraceContext): Unit = {
val listener = warningListener()
val testTraceContext = thunk
if (expectWarning) {
val warning = listener.fishForMessage() {
case Warning(_, _, msg) if msg.toString.startsWith("Timeout triggered for ask pattern registered at") ⇒ true
case others ⇒ false
}.asInstanceOf[Warning]
warning.asInstanceOf[TraceContextAware].traceContext should equal(testTraceContext)
messageSizeLimit.map { messageLimit ⇒
warning.message.toString.lines.size should be(messageLimit)
}
} else {
listener.expectNoMsg()
}
}
def warningListener(): TestProbe = {
val listener = TestProbe()
system.eventStream.subscribe(listener.ref, classOf[Warning])
listener
}
def setAskPatternTimeoutWarningMode(mode: String): Unit = {
val target = Kamon(Akka)
val field = target.getClass.getDeclaredField("askPatternTimeoutWarning")
field.setAccessible(true)
field.set(target, mode)
}
val fixtureCounter = new AtomicInteger(0)
trait NoReplyFixture {
def noReplyActorRef: ActorRef = system.actorOf(Props[NoReply], "no-reply-" + fixtureCounter.incrementAndGet())
def noReplyActorSelection: ActorSelection = {
val target = noReplyActorRef
system.actorSelection(target.path)
}
}
}
class NoReply extends Actor {
def receive = {
case any ⇒
}
}
|