package kamon.metric import akka.actor._ import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.metric.Subscriptions.TickMetricSnapshot import org.scalatest.{ Matchers, WordSpecLike } import scala.concurrent.duration._ class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString( """ |kamon.metrics { | tick-interval = 1 hour |} """.stripMargin)) val metricsExtension = Kamon(Metrics)(system) import metricsExtension.{ register, subscribe, unsubscribe } "the Subscriptions messaging protocol" should { "allow subscribing for a single tick" in { val subscriber = TestProbe() register(TraceMetrics("one-shot"), TraceMetrics.Factory) subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false) metricsExtension.subscriptions ! Subscriptions.FlushMetrics val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) metricsExtension.subscriptions ! Subscriptions.FlushMetrics subscriber.expectNoMsg(1 second) } "allow subscribing permanently to a metric" in { val subscriber = TestProbe() register(TraceMetrics("permanent"), TraceMetrics.Factory) subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { metricsExtension.subscriptions ! Subscriptions.FlushMetrics val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) tickSnapshot.metrics.keys should contain(TraceMetrics("permanent")) subscriber.expectNoMsg(1 second) } } "allow subscribing to metrics matching a glob pattern" in { val subscriber = TestProbe() register(TraceMetrics("include-one"), TraceMetrics.Factory) register(TraceMetrics("exclude-two"), TraceMetrics.Factory) register(TraceMetrics("include-three"), TraceMetrics.Factory) subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { metricsExtension.subscriptions ! Subscriptions.FlushMetrics val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(2) tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) subscriber.expectNoMsg(1 second) } } "send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in { val subscriber = TestProbe() register(TraceMetrics("include-one"), TraceMetrics.Factory) register(TraceMetrics("exclude-two"), TraceMetrics.Factory) register(TraceMetrics("include-three"), TraceMetrics.Factory) subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true) subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true) for (repetition ← 1 to 5) { metricsExtension.subscriptions ! Subscriptions.FlushMetrics val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(2) tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) } } "allow un-subscribing a subscriber" in { val subscriber = TestProbe() register(TraceMetrics("one-shot"), TraceMetrics.Factory) subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true) metricsExtension.subscriptions ! Subscriptions.FlushMetrics val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) unsubscribe(subscriber.ref) metricsExtension.subscriptions ! Subscriptions.FlushMetrics subscriber.expectNoMsg(1 second) } "watch all subscribers and un-subscribe them if they die" in { val subscriber = TestProbe() val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref))) watch(forwarderSubscriber) register(TraceMetrics("one-shot"), TraceMetrics.Factory) subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true) metricsExtension.subscriptions ! Subscriptions.FlushMetrics val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] tickSnapshot.metrics.size should be(1) tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) forwarderSubscriber ! PoisonPill expectTerminated(forwarderSubscriber) metricsExtension.subscriptions ! Subscriptions.FlushMetrics metricsExtension.subscriptions ! Subscriptions.FlushMetrics metricsExtension.subscriptions ! Subscriptions.FlushMetrics metricsExtension.subscriptions ! Subscriptions.FlushMetrics subscriber.expectNoMsg(2 seconds) } } } class ForwarderSubscriber(target: ActorRef) extends Actor { def receive = { case anything ⇒ target.forward(anything) } }