diff options
Diffstat (limited to 'kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala')
-rw-r--r-- | kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala | 112 |
1 files changed, 47 insertions, 65 deletions
diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala index 9144725e..40200685 100644 --- a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala @@ -1,128 +1,110 @@ package kamon.metric import akka.actor._ -import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } +import akka.testkit.{ TestProbe, ImplicitSender } import com.typesafe.config.ConfigFactory -import kamon.Kamon -import kamon.metric.Subscriptions.TickMetricSnapshot -import org.scalatest.{ Matchers, WordSpecLike } +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.testkit.BaseKamonSpec import scala.concurrent.duration._ -class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { - implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString( - """ - |kamon.metrics { - | tick-interval = 1 hour - |} - """.stripMargin)) +class SubscriptionsProtocolSpec extends BaseKamonSpec("subscriptions-protocol-spec") with ImplicitSender { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon.metric { + | tick-interval = 1 hour + |} + """.stripMargin) - val metricsExtension = Kamon(Metrics)(system) - import metricsExtension.{ register, subscribe, unsubscribe } + val metricsModule = kamon.metrics + import metricsModule.{ register, subscribe, unsubscribe } "the Subscriptions messaging protocol" should { "allow subscribing for a single tick" in { val subscriber = TestProbe() - register(TraceMetrics("one-shot"), TraceMetrics.Factory) - subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false) + register(TraceMetrics, "one-shot") + subscribe("trace", "one-shot", subscriber.ref, permanently = false) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace")) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() subscriber.expectNoMsg(1 second) } "allow subscribing permanently to a metric" in { val subscriber = TestProbe() - register(TraceMetrics("permanent"), TraceMetrics.Factory) - subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true) + register(TraceMetrics, "permanent") + subscribe("trace", "permanent", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("permanent")) - subscriber.expectNoMsg(1 second) + tickSnapshot.metrics.keys should contain(Entity("permanent", "trace")) } } "allow subscribing to metrics matching a glob pattern" in { val subscriber = TestProbe() - register(TraceMetrics("include-one"), TraceMetrics.Factory) - register(TraceMetrics("exclude-two"), TraceMetrics.Factory) - register(TraceMetrics("include-three"), TraceMetrics.Factory) - subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true) + register(TraceMetrics, "include-one") + register(TraceMetrics, "exclude-two") + register(TraceMetrics, "include-three") + subscribe("trace", "include-*", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(2) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) - subscriber.expectNoMsg(1 second) + tickSnapshot.metrics.keys should contain(Entity("include-one", "trace")) + tickSnapshot.metrics.keys should contain(Entity("include-three", "trace")) } } "send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in { val subscriber = TestProbe() - register(TraceMetrics("include-one"), TraceMetrics.Factory) - register(TraceMetrics("exclude-two"), TraceMetrics.Factory) - register(TraceMetrics("include-three"), TraceMetrics.Factory) - subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true) - subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true) + register(TraceMetrics, "include-one") + register(TraceMetrics, "exclude-two") + register(TraceMetrics, "include-three") + subscribe("trace", "include-one", subscriber.ref, permanently = true) + subscribe("trace", "include-three", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(2) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) - tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) + tickSnapshot.metrics.keys should contain(Entity("include-one", "trace")) + tickSnapshot.metrics.keys should contain(Entity("include-three", "trace")) } } "allow un-subscribing a subscriber" in { val subscriber = TestProbe() - register(TraceMetrics("one-shot"), TraceMetrics.Factory) - subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true) + register(TraceMetrics, "one-shot") + subscribe("trace", "one-shot", subscriber.ref, permanently = true) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + tickSnapshot.metrics.keys should contain(Entity("one-shot", "trace")) unsubscribe(subscriber.ref) - metricsExtension.subscriptions ! Subscriptions.FlushMetrics + flushSubscriptions() subscriber.expectNoMsg(1 second) } + } - "watch all subscribers and un-subscribe them if they die" in { - val subscriber = TestProbe() - val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref))) - watch(forwarderSubscriber) - register(TraceMetrics("one-shot"), TraceMetrics.Factory) - subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true) - - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] - tickSnapshot.metrics.size should be(1) - tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) - - forwarderSubscriber ! PoisonPill - expectTerminated(forwarderSubscriber) - - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - metricsExtension.subscriptions ! Subscriptions.FlushMetrics - subscriber.expectNoMsg(2 seconds) - } + def subscriptionsActor: ActorRef = { + val listener = TestProbe() + system.actorSelection("/user/kamon/kamon-metrics").tell(Identify(1), listener.ref) + listener.expectMsgType[ActorIdentity].ref.get } } |