1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
package kamon.metric
import akka.actor._
import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase }
import com.typesafe.config.ConfigFactory
import kamon.Kamon
import kamon.metric.Subscriptions.TickMetricSnapshot
import org.scalatest.{ Matchers, WordSpecLike }
import scala.concurrent.duration._
class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers {
implicit def self = testActor
implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString(
"""
|kamon.metrics {
| tick-interval = 1 hour
|}
""".stripMargin))
val metricsExtension = Kamon(Metrics)(system)
import metricsExtension.{ register, subscribe, unsubscribe }
"the Subscriptions messaging protocol" should {
"allow subscribing for a single tick" in {
val subscriber = TestProbe()
register(TraceMetrics("one-shot"), TraceMetrics.Factory)
subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false)
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
subscriber.expectNoMsg(1 second)
}
"allow subscribing permanently to a metric" in {
val subscriber = TestProbe()
register(TraceMetrics("permanent"), TraceMetrics.Factory)
subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
tickSnapshot.metrics.keys should contain(TraceMetrics("permanent"))
subscriber.expectNoMsg(1 second)
}
}
"allow subscribing to metrics matching a glob pattern" in {
val subscriber = TestProbe()
register(TraceMetrics("include-one"), TraceMetrics.Factory)
register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
register(TraceMetrics("include-three"), TraceMetrics.Factory)
subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
subscriber.expectNoMsg(1 second)
}
}
"send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in {
val subscriber = TestProbe()
register(TraceMetrics("include-one"), TraceMetrics.Factory)
register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
register(TraceMetrics("include-three"), TraceMetrics.Factory)
subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true)
subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
}
}
"allow un-subscribing a subscriber" in {
val subscriber = TestProbe()
register(TraceMetrics("one-shot"), TraceMetrics.Factory)
subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true)
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
unsubscribe(subscriber.ref)
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
subscriber.expectNoMsg(1 second)
}
"watch all subscribers and un-subscribe them if they die" in {
val subscriber = TestProbe()
val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref)))
watch(forwarderSubscriber)
register(TraceMetrics("one-shot"), TraceMetrics.Factory)
subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true)
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
forwarderSubscriber ! PoisonPill
expectTerminated(forwarderSubscriber)
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
metricsExtension.subscriptions ! Subscriptions.FlushMetrics
subscriber.expectNoMsg(2 seconds)
}
}
}
class ForwarderSubscriber(target: ActorRef) extends Actor {
def receive = {
case anything ⇒ target.forward(anything)
}
}
|