aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala')
-rw-r--r--kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala112
1 files changed, 47 insertions, 65 deletions
diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
index 9144725e..40200685 100644
--- a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
@@ -1,128 +1,110 @@
package kamon.metric
import akka.actor._
-import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase }
+import akka.testkit.{ TestProbe, ImplicitSender }
import com.typesafe.config.ConfigFactory
-import kamon.Kamon
-import kamon.metric.Subscriptions.TickMetricSnapshot
-import org.scalatest.{ Matchers, WordSpecLike }
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.testkit.BaseKamonSpec
import scala.concurrent.duration._
-class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
- implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString(
- """
- |kamon.metrics {
- | tick-interval = 1 hour
- |}
- """.stripMargin))
+class SubscriptionsProtocolSpec extends BaseKamonSpec("subscriptions-protocol-spec") with ImplicitSender {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon.metric {
+ | tick-interval = 1 hour
+ |}
+ """.stripMargin)
- val metricsExtension = Kamon(Metrics)(system)
- import metricsExtension.{ register, subscribe, unsubscribe }
+ val metricsModule = kamon.metrics
+ import metricsModule.{ register, subscribe, unsubscribe }
"the Subscriptions messaging protocol" should {
"allow subscribing for a single tick" in {
val subscriber = TestProbe()
- register(TraceMetrics("one-shot"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false)
+ register(TraceMetrics, "one-shot")
+ subscribe("trace", "one-shot", subscriber.ref, permanently = false)
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
+ tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
subscriber.expectNoMsg(1 second)
}
"allow subscribing permanently to a metric" in {
val subscriber = TestProbe()
- register(TraceMetrics("permanent"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true)
+ register(TraceMetrics, "permanent")
+ subscribe("trace", "permanent", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("permanent"))
- subscriber.expectNoMsg(1 second)
+ tickSnapshot.metrics.keys should contain(Entity("permanent", "trace"))
}
}
"allow subscribing to metrics matching a glob pattern" in {
val subscriber = TestProbe()
- register(TraceMetrics("include-one"), TraceMetrics.Factory)
- register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
- register(TraceMetrics("include-three"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true)
+ register(TraceMetrics, "include-one")
+ register(TraceMetrics, "exclude-two")
+ register(TraceMetrics, "include-three")
+ subscribe("trace", "include-*", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
- subscriber.expectNoMsg(1 second)
+ tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
+ tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
}
}
"send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in {
val subscriber = TestProbe()
- register(TraceMetrics("include-one"), TraceMetrics.Factory)
- register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
- register(TraceMetrics("include-three"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true)
- subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true)
+ register(TraceMetrics, "include-one")
+ register(TraceMetrics, "exclude-two")
+ register(TraceMetrics, "include-three")
+ subscribe("trace", "include-one", subscriber.ref, permanently = true)
+ subscribe("trace", "include-three", subscriber.ref, permanently = true)
for (repetition ← 1 to 5) {
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(2)
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
- tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
+ tickSnapshot.metrics.keys should contain(Entity("include-one", "trace"))
+ tickSnapshot.metrics.keys should contain(Entity("include-three", "trace"))
}
}
"allow un-subscribing a subscriber" in {
val subscriber = TestProbe()
- register(TraceMetrics("one-shot"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true)
+ register(TraceMetrics, "one-shot")
+ subscribe("trace", "one-shot", subscriber.ref, permanently = true)
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
+ tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace"))
unsubscribe(subscriber.ref)
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ flushSubscriptions()
subscriber.expectNoMsg(1 second)
}
+ }
- "watch all subscribers and un-subscribe them if they die" in {
- val subscriber = TestProbe()
- val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref)))
- watch(forwarderSubscriber)
- register(TraceMetrics("one-shot"), TraceMetrics.Factory)
- subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true)
-
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
- tickSnapshot.metrics.size should be(1)
- tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
-
- forwarderSubscriber ! PoisonPill
- expectTerminated(forwarderSubscriber)
-
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- metricsExtension.subscriptions ! Subscriptions.FlushMetrics
- subscriber.expectNoMsg(2 seconds)
- }
+ def subscriptionsActor: ActorRef = {
+ val listener = TestProbe()
+ system.actorSelection("/user/kamon/kamon-metrics").tell(Identify(1), listener.ref)
+ listener.expectMsgType[ActorIdentity].ref.get
}
}