aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
blob: 40200685a7f66f2e8cbad17bfa0d044b5c32e812 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package kamon.metric

import akka.actor._
import akka.testkit.{ TestProbe, ImplicitSender }
import com.typesafe.config.ConfigFactory
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._

class SubscriptionsProtocolSpec extends BaseKamonSpec("subscriptions-protocol-spec") with ImplicitSender {
  override lazy val config =
    ConfigFactory.parseString(
      """
        |kamon.metric {
        |  tick-interval = 1 hour
        |}
      """.stripMargin)

  val metricsModule = kamon.metrics
  import metricsModule.{ register, subscribe, unsubscribe }

  "the Subscriptions messaging protocol" should {
    "allow subscribing for a single tick" in {
      val subscriber = TestProbe()
      register(TraceMetrics, "one-shot")
      subscribe("trace", "one-shot", subscriber.ref, permanently = false)

      flushSubscriptions()
      val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]

      tickSnapshot.metrics.size should be(1)
      tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))

      flushSubscriptions()
      subscriber.expectNoMsg(1 second)
    }

    "allow subscribing permanently to a metric" in {
      val subscriber = TestProbe()
      register(TraceMetrics, "permanent")
      subscribe("trace", "permanent", subscriber.ref, permanently = true)

      for (repetition  1 to 5) {
        flushSubscriptions()
        val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]

        tickSnapshot.metrics.size should be(1)
        tickSnapshot.metrics.keys should contain(Entity("permanent", "trace"))
      }
    }

    "allow subscribing to metrics matching a glob pattern" in {
      val subscriber = TestProbe()
      register(TraceMetrics, "include-one")
      register(TraceMetrics, "exclude-two")
      register(TraceMetrics, "include-three")
      subscribe("trace", "include-*", subscriber.ref, permanently = true)

      for (repetition  1 to 5) {
        flushSubscriptions()
        val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]

        tickSnapshot.metrics.size should be(2)
        tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
        tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
      }
    }

    "send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in {
      val subscriber = TestProbe()
      register(TraceMetrics, "include-one")
      register(TraceMetrics, "exclude-two")
      register(TraceMetrics, "include-three")
      subscribe("trace", "include-one", subscriber.ref, permanently = true)
      subscribe("trace", "include-three", subscriber.ref, permanently = true)

      for (repetition  1 to 5) {
        flushSubscriptions()
        val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]

        tickSnapshot.metrics.size should be(2)
        tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
        tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
      }
    }

    "allow un-subscribing a subscriber" in {
      val subscriber = TestProbe()
      register(TraceMetrics, "one-shot")
      subscribe("trace", "one-shot", subscriber.ref, permanently = true)

      flushSubscriptions()
      val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
      tickSnapshot.metrics.size should be(1)
      tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))

      unsubscribe(subscriber.ref)

      flushSubscriptions()
      subscriber.expectNoMsg(1 second)
    }
  }

  def subscriptionsActor: ActorRef = {
    val listener = TestProbe()
    system.actorSelection("/user/kamon/kamon-metrics").tell(Identify(1), listener.ref)
    listener.expectMsgType[ActorIdentity].ref.get
  }
}

class ForwarderSubscriber(target: ActorRef) extends Actor {
  def receive = {
    case anything  target.forward(anything)
  }
}