From 97d299613bd725540e5279708ecfcade59dcda70 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 28 Jul 2014 01:02:02 -0300 Subject: = core: subscription protocol specification, closes #51 --- .../main/scala/kamon/metric/MetricsExtension.scala | 10 +- .../main/scala/kamon/metric/Subscriptions.scala | 93 ++++++++++---- .../kamon/metric/SubscriptionsProtocolSpec.scala | 133 +++++++++++++++++++++ .../test/scala/kamon/metric/UserMetricsSpec.scala | 2 +- 4 files changed, 207 insertions(+), 31 deletions(-) create mode 100644 kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala (limited to 'kamon-core') diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index abc6e44e..00214f51 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -26,7 +26,7 @@ import kamon.util.GlobPathFilter import kamon.Kamon import akka.actor import kamon.metric.Metrics.MetricGroupFilter -import kamon.metric.Subscriptions.Subscribe +import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } import java.util.concurrent.TimeUnit class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { @@ -55,9 +55,11 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { storage.remove(identity) } - def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { - subscriptions.tell(Subscribe(category, selection, permanently), receiver) - } + def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit = + subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber) + + def unsubscribe(subscriber: ActorRef): Unit = + subscriptions.tell(Unsubscribe(subscriber), subscriber) def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { import scala.concurrent.duration._ diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala index eb2168ad..c6571507 100644 --- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala +++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala @@ -16,8 +16,8 @@ package kamon.metric -import akka.actor.{ Props, ActorRef, Actor } -import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } +import akka.actor._ +import kamon.metric.Subscriptions._ import kamon.util.GlobPathFilter import scala.concurrent.duration.{ FiniteDuration, Duration } import java.util.concurrent.TimeUnit @@ -27,65 +27,106 @@ import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer class Subscriptions extends Actor { import context.system - val config = context.system.settings.config - val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) - val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + val flushMetricsSchedule = scheduleFlushMessage() val collectionContext = Kamon(Metrics).buildDefaultCollectionContext var lastTick: Long = System.currentTimeMillis() - var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty - var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty + var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty def receive = { - case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) - case FlushMetrics ⇒ flush() + case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case Terminated(subscriber) ⇒ unsubscribe(subscriber) + case FlushMetrics ⇒ flush() } - def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = { - val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) - if (permanent) { - val receivers = subscribedPermanently.get(filter).getOrElse(Nil) - subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = { + context.watch(subscriber) + val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection)) + if (permanent) { + permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine { + permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) + }) } else { - val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) - subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine { + oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) + }) } + } + def unsubscribe(subscriber: ActorRef): Unit = { + if (permanentSubscriptions.contains(subscriber)) + permanentSubscriptions = permanentSubscriptions - subscriber + + if (oneShotSubscriptions.contains(subscriber)) + oneShotSubscriptions = oneShotSubscriptions - subscriber } def flush(): Unit = { val currentTick = System.currentTimeMillis() val snapshots = collectAll() - dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) - dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots) lastTick = currentTick - subscribedForOneShot = Map.empty + oneShotSubscriptions = Map.empty } - def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = - (for ((identity, recorder) ← Kamon(Metrics).storage) yield (identity, recorder.collect(collectionContext))).toMap + def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = { + val allMetrics = Kamon(Metrics).storage + val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot] - def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + allMetrics.foreach { + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) + } + + builder.result() + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[ActorRef, MetricSelectionFilter], snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - for ((filter, receivers) ← subscriptions) yield { + for ((subscriber, filter) ← subscriptions) { val selection = snapshots.filter(group ⇒ filter.accept(group._1)) val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - receivers.foreach(_ ! tickMetrics) + subscriber ! tickMetrics } } + + def scheduleFlushMessage(): Cancellable = { + val config = context.system.settings.config + val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) + context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + } } object Subscriptions { case object FlushMetrics - case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false) + case class Unsubscribe(subscriber: ActorRef) + case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false) case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) { + trait MetricSelectionFilter { + def accept(identity: MetricGroupIdentity): Boolean + } + + object MetricSelectionFilter { + val empty = new MetricSelectionFilter { + def accept(identity: MetricGroupIdentity): Boolean = false + } + + implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) { + def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter { + def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity) + } + } + } + + case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter { def accept(identity: MetricGroupIdentity): Boolean = { category.equals(identity.category) && globFilter.accept(identity.name) } diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala new file mode 100644 index 00000000..9144725e --- /dev/null +++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala @@ -0,0 +1,133 @@ +package kamon.metric + +import akka.actor._ +import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase } +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.metric.Subscriptions.TickMetricSnapshot +import org.scalatest.{ Matchers, WordSpecLike } +import scala.concurrent.duration._ + +class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender { + implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString( + """ + |kamon.metrics { + | tick-interval = 1 hour + |} + """.stripMargin)) + + val metricsExtension = Kamon(Metrics)(system) + import metricsExtension.{ register, subscribe, unsubscribe } + + "the Subscriptions messaging protocol" should { + "allow subscribing for a single tick" in { + val subscriber = TestProbe() + register(TraceMetrics("one-shot"), TraceMetrics.Factory) + subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + subscriber.expectNoMsg(1 second) + } + + "allow subscribing permanently to a metric" in { + val subscriber = TestProbe() + register(TraceMetrics("permanent"), TraceMetrics.Factory) + subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true) + + for (repetition ← 1 to 5) { + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("permanent")) + subscriber.expectNoMsg(1 second) + } + } + + "allow subscribing to metrics matching a glob pattern" in { + val subscriber = TestProbe() + register(TraceMetrics("include-one"), TraceMetrics.Factory) + register(TraceMetrics("exclude-two"), TraceMetrics.Factory) + register(TraceMetrics("include-three"), TraceMetrics.Factory) + subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true) + + for (repetition ← 1 to 5) { + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(2) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) + subscriber.expectNoMsg(1 second) + } + } + + "send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in { + val subscriber = TestProbe() + register(TraceMetrics("include-one"), TraceMetrics.Factory) + register(TraceMetrics("exclude-two"), TraceMetrics.Factory) + register(TraceMetrics("include-three"), TraceMetrics.Factory) + subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true) + subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true) + + for (repetition ← 1 to 5) { + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + + tickSnapshot.metrics.size should be(2) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-one")) + tickSnapshot.metrics.keys should contain(TraceMetrics("include-three")) + } + } + + "allow un-subscribing a subscriber" in { + val subscriber = TestProbe() + register(TraceMetrics("one-shot"), TraceMetrics.Factory) + subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + + unsubscribe(subscriber.ref) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + subscriber.expectNoMsg(1 second) + } + + "watch all subscribers and un-subscribe them if they die" in { + val subscriber = TestProbe() + val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref))) + watch(forwarderSubscriber) + register(TraceMetrics("one-shot"), TraceMetrics.Factory) + subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot] + tickSnapshot.metrics.size should be(1) + tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot")) + + forwarderSubscriber ! PoisonPill + expectTerminated(forwarderSubscriber) + + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + metricsExtension.subscriptions ! Subscriptions.FlushMetrics + subscriber.expectNoMsg(2 seconds) + } + } +} + +class ForwarderSubscriber(target: ActorRef) extends Actor { + def receive = { + case anything ⇒ target.forward(anything) + } +} diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala index 385c8404..8f1d06d8 100644 --- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala @@ -14,7 +14,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString( """ |kamon.metrics { - | flush-interval = 1 hour + | tick-interval = 1 hour | default-collection-context-buffer-size = 10 | | precision { -- cgit v1.2.3