1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
package kamon.metric
import akka.actor._
import akka.testkit.{ TestProbe, ImplicitSender }
import com.typesafe.config.ConfigFactory
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
class SubscriptionsProtocolSpec extends BaseKamonSpec("subscriptions-protocol-spec") with ImplicitSender {
override lazy val config =
ConfigFactory.parseString(
"""
|kamon.metric {
| tick-interval = 1 hour
|}
""".stripMargin)
val metricsModule = kamon.metrics
import metricsModule.{ register, subscribe, unsubscribe }
"the Subscriptions messaging protocol" should {
"allow subscribing for a single tick" in {
val subscriber = TestProbe()
register(TraceMetrics, "one-shot")
subscribe("trace", "one-shot", subscriber.ref, permanently = false)
flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))
flushSubscriptions()
subscriber.expectNoMsg(1 second)
}
"allow subscribing permanently to a metric" in {
val subscriber = TestProbe()
register(TraceMetrics, "permanent")
subscribe("trace", "permanent", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
tickSnapshot.metrics.keys should contain(Entity("permanent", "trace"))
}
}
"allow subscribing to metrics matching a glob pattern" in {
val subscriber = TestProbe()
register(TraceMetrics, "include-one")
register(TraceMetrics, "exclude-two")
register(TraceMetrics, "include-three")
subscribe("trace", "include-*", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
}
}
"send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in {
val subscriber = TestProbe()
register(TraceMetrics, "include-one")
register(TraceMetrics, "exclude-two")
register(TraceMetrics, "include-three")
subscribe("trace", "include-one", subscriber.ref, permanently = true)
subscribe("trace", "include-three", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
}
}
"allow un-subscribing a subscriber" in {
val subscriber = TestProbe()
register(TraceMetrics, "one-shot")
subscribe("trace", "one-shot", subscriber.ref, permanently = true)
flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))
unsubscribe(subscriber.ref)
flushSubscriptions()
subscriber.expectNoMsg(1 second)
}
}
def subscriptionsActor: ActorRef = {
val listener = TestProbe()
system.actorSelection("/user/kamon/kamon-metrics").tell(Identify(1), listener.ref)
listener.expectMsgType[ActorIdentity].ref.get
}
}
class ForwarderSubscriber(target: ActorRef) extends Actor {
def receive = {
case anything ⇒ target.forward(anything)
}
}
|