From 97d299613bd725540e5279708ecfcade59dcda70 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 28 Jul 2014 01:02:02 -0300 Subject: = core: subscription protocol specification, closes #51 --- .../kamon/metric/SubscriptionsProtocolSpec.scala | 133 +++++++++++++++++++++ .../test/scala/kamon/metric/UserMetricsSpec.scala | 2 +- 2 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala (limited to 'kamon-core/src/test/scala/kamon') diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala new file mode 100644 index 00000000..9144725e --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala @@ -0,0 +1,133 @@ +package kamon.metric + +import akka.actor._ +import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.Subscriptions.TickMetricSnapshot +import org.scalatest.{ Matchers, WordSpecLike } +import scala.concurrent.duration._ + +class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + |} + """.stripMargin)) + + val metricsExtension = Kamon(Metrics)(system) + import metricsExtension.{ register, subscribe, unsubscribe } + + "the Subscriptions messaging protocol" should { + "allow subscribing for a single tick" in { + val subscriber = TestProbe() + register(TraceMetrics("one-shot"), TraceMetrics.Factory) + subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + subscriber.expectNoMsg(1 second) + } + + "allow subscribing permanently to a metric" in { + val subscriber = TestProbe() + register(TraceMetrics("permanent"), TraceMetrics.Factory) + subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true) + + for (repetition ← 1 to 5) { + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("permanent")) + subscriber.expectNoMsg(1 second) + } + } + + "allow subscribing to metrics matching a glob pattern" in { + val subscriber = TestProbe() + register(TraceMetrics("include-one"), TraceMetrics.Factory) + register(TraceMetrics("exclude-two"), TraceMetrics.Factory) + register(TraceMetrics("include-three"), TraceMetrics.Factory) + subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true) + + for (repetition ← 1 to 5) { + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(2) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) + subscriber.expectNoMsg(1 second) + } + } + + "send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in { + val subscriber = TestProbe() + register(TraceMetrics("include-one"), TraceMetrics.Factory) + register(TraceMetrics("exclude-two"), TraceMetrics.Factory) + register(TraceMetrics("include-three"), TraceMetrics.Factory) + subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true) + subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true) + + for (repetition ← 1 to 5) { + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(2) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) + } + } + + "allow un-subscribing a subscriber" in { + val subscriber = TestProbe() + register(TraceMetrics("one-shot"), TraceMetrics.Factory) + subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + + unsubscribe(subscriber.ref) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + subscriber.expectNoMsg(1 second) + } + + "watch all subscribers and un-subscribe them if they die" in { + val subscriber = TestProbe() + val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref))) + watch(forwarderSubscriber) + register(TraceMetrics("one-shot"), TraceMetrics.Factory) + subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + + forwarderSubscriber ! PoisonPill + expectTerminated(forwarderSubscriber) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + subscriber.expectNoMsg(2 seconds) + } + } +} + +class ForwarderSubscriber(target: ActorRef) extends Actor { + def receive = { + case anything ⇒ target.forward(anything) + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala index 385c8404..8f1d06d8 100644 --- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala @@ -14,7 +14,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( """ |kamon.metrics { - | flush-interval = 1 hour + | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | | precision { -- cgit v1.2.3