aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-07-28 01:02:02 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-07-28 01:02:02 -0300
commitca65ad5b1bd5156ec487b435c9c015d6111963f0 (patch)
treeff6e7bf309fb20b768de274a69efa893bff6117a
parentb07e8303e2e3eaf52638e1d51853729cb4983c73 (diff)
downloadKamon-ca65ad5b1bd5156ec487b435c9c015d6111963f0.tar.gz
Kamon-ca65ad5b1bd5156ec487b435c9c015d6111963f0.tar.bz2
Kamon-ca65ad5b1bd5156ec487b435c9c015d6111963f0.zip
= core: subscription protocol specification, closes #51
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Subscriptions.scala93
-rw-r--r--kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala133
-rw-r--r--kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala2
4 files changed, 207 insertions, 31 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index abc6e44e..00214f51 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -26,7 +26,7 @@ import kamon.util.GlobPathFilter
import kamon.Kamon
import akka.actor
import kamon.metric.Metrics.MetricGroupFilter
-import kamon.metric.Subscriptions.Subscribe
+import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe }
import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
@@ -55,9 +55,11 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
storage.remove(identity)
}
- def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = {
- subscriptions.tell(Subscribe(category, selection, permanently), receiver)
- }
+ def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit =
+ subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber)
+
+ def unsubscribe(subscriber: ActorRef): Unit =
+ subscriptions.tell(Unsubscribe(subscriber), subscriber)
def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
import scala.concurrent.duration._
diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
index eb2168ad..c6571507 100644
--- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
@@ -16,8 +16,8 @@
package kamon.metric
-import akka.actor.{ Props, ActorRef, Actor }
-import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe }
+import akka.actor._
+import kamon.metric.Subscriptions._
import kamon.util.GlobPathFilter
import scala.concurrent.duration.{ FiniteDuration, Duration }
import java.util.concurrent.TimeUnit
@@ -27,65 +27,106 @@ import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
class Subscriptions extends Actor {
import context.system
- val config = context.system.settings.config
- val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
- val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+ val flushMetricsSchedule = scheduleFlushMessage()
val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
var lastTick: Long = System.currentTimeMillis()
- var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
- var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+ var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
+ var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
def receive = {
- case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent)
- case FlushMetrics ⇒ flush()
+ case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent)
+ case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
+ case Terminated(subscriber) ⇒ unsubscribe(subscriber)
+ case FlushMetrics ⇒ flush()
}
- def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = {
- val filter = MetricGroupFilter(category, new GlobPathFilter(selection))
- if (permanent) {
- val receivers = subscribedPermanently.get(filter).getOrElse(Nil)
- subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers)
+ def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = {
+ context.watch(subscriber)
+ val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection))
+ if (permanent) {
+ permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine {
+ permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
+ })
} else {
- val receivers = subscribedForOneShot.get(filter).getOrElse(Nil)
- subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers)
+ oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine {
+ oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
+ })
}
+ }
+ def unsubscribe(subscriber: ActorRef): Unit = {
+ if (permanentSubscriptions.contains(subscriber))
+ permanentSubscriptions = permanentSubscriptions - subscriber
+
+ if (oneShotSubscriptions.contains(subscriber))
+ oneShotSubscriptions = oneShotSubscriptions - subscriber
}
def flush(): Unit = {
val currentTick = System.currentTimeMillis()
val snapshots = collectAll()
- dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots)
- dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots)
+ dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots)
+ dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots)
lastTick = currentTick
- subscribedForOneShot = Map.empty
+ oneShotSubscriptions = Map.empty
}
- def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] =
- (for ((identity, recorder) ← Kamon(Metrics).storage) yield (identity, recorder.collect(collectionContext))).toMap
+ def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = {
+ val allMetrics = Kamon(Metrics).storage
+ val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot]
- def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
+ allMetrics.foreach {
+ case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
+ }
+
+ builder.result()
+ }
+
+ def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[ActorRef, MetricSelectionFilter],
snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
- for ((filter, receivers) ← subscriptions) yield {
+ for ((subscriber, filter) ← subscriptions) {
val selection = snapshots.filter(group ⇒ filter.accept(group._1))
val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
- receivers.foreach(_ ! tickMetrics)
+ subscriber ! tickMetrics
}
}
+
+ def scheduleFlushMessage(): Cancellable = {
+ val config = context.system.settings.config
+ val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
+ context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+ }
}
object Subscriptions {
case object FlushMetrics
- case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false)
+ case class Unsubscribe(subscriber: ActorRef)
+ case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false)
case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot])
- case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) {
+ trait MetricSelectionFilter {
+ def accept(identity: MetricGroupIdentity): Boolean
+ }
+
+ object MetricSelectionFilter {
+ val empty = new MetricSelectionFilter {
+ def accept(identity: MetricGroupIdentity): Boolean = false
+ }
+
+ implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) {
+ def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter {
+ def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity)
+ }
+ }
+ }
+
+ case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter {
def accept(identity: MetricGroupIdentity): Boolean = {
category.equals(identity.category) && globFilter.accept(identity.name)
}
diff --git a/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
new file mode 100644
index 00000000..9144725e
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/metric/SubscriptionsProtocolSpec.scala
@@ -0,0 +1,133 @@
+package kamon.metric
+
+import akka.actor._
+import akka.testkit.{ TestProbe, ImplicitSender, TestKitBase }
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.metric.Subscriptions.TickMetricSnapshot
+import org.scalatest.{ Matchers, WordSpecLike }
+import scala.concurrent.duration._
+
+class SubscriptionsProtocolSpec extends TestKitBase with WordSpecLike with Matchers with ImplicitSender {
+ implicit lazy val system: ActorSystem = ActorSystem("subscriptions-protocol-spec", ConfigFactory.parseString(
+ """
+ |kamon.metrics {
+ | tick-interval = 1 hour
+ |}
+ """.stripMargin))
+
+ val metricsExtension = Kamon(Metrics)(system)
+ import metricsExtension.{ register, subscribe, unsubscribe }
+
+ "the Subscriptions messaging protocol" should {
+ "allow subscribing for a single tick" in {
+ val subscriber = TestProbe()
+ register(TraceMetrics("one-shot"), TraceMetrics.Factory)
+ subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = false)
+
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
+
+ tickSnapshot.metrics.size should be(1)
+ tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
+
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ subscriber.expectNoMsg(1 second)
+ }
+
+ "allow subscribing permanently to a metric" in {
+ val subscriber = TestProbe()
+ register(TraceMetrics("permanent"), TraceMetrics.Factory)
+ subscribe(TraceMetrics, "permanent", subscriber.ref, permanently = true)
+
+ for (repetition ← 1 to 5) {
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
+
+ tickSnapshot.metrics.size should be(1)
+ tickSnapshot.metrics.keys should contain(TraceMetrics("permanent"))
+ subscriber.expectNoMsg(1 second)
+ }
+ }
+
+ "allow subscribing to metrics matching a glob pattern" in {
+ val subscriber = TestProbe()
+ register(TraceMetrics("include-one"), TraceMetrics.Factory)
+ register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
+ register(TraceMetrics("include-three"), TraceMetrics.Factory)
+ subscribe(TraceMetrics, "include-*", subscriber.ref, permanently = true)
+
+ for (repetition ← 1 to 5) {
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
+
+ tickSnapshot.metrics.size should be(2)
+ tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
+ tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
+ subscriber.expectNoMsg(1 second)
+ }
+ }
+
+ "send a single TickMetricSnapshot to each subscriber, even if subscribed multiple times" in {
+ val subscriber = TestProbe()
+ register(TraceMetrics("include-one"), TraceMetrics.Factory)
+ register(TraceMetrics("exclude-two"), TraceMetrics.Factory)
+ register(TraceMetrics("include-three"), TraceMetrics.Factory)
+ subscribe(TraceMetrics, "include-one", subscriber.ref, permanently = true)
+ subscribe(TraceMetrics, "include-three", subscriber.ref, permanently = true)
+
+ for (repetition ← 1 to 5) {
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
+
+ tickSnapshot.metrics.size should be(2)
+ tickSnapshot.metrics.keys should contain(TraceMetrics("include-one"))
+ tickSnapshot.metrics.keys should contain(TraceMetrics("include-three"))
+ }
+ }
+
+ "allow un-subscribing a subscriber" in {
+ val subscriber = TestProbe()
+ register(TraceMetrics("one-shot"), TraceMetrics.Factory)
+ subscribe(TraceMetrics, "one-shot", subscriber.ref, permanently = true)
+
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
+ tickSnapshot.metrics.size should be(1)
+ tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
+
+ unsubscribe(subscriber.ref)
+
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ subscriber.expectNoMsg(1 second)
+ }
+
+ "watch all subscribers and un-subscribe them if they die" in {
+ val subscriber = TestProbe()
+ val forwarderSubscriber = system.actorOf(Props(new ForwarderSubscriber(subscriber.ref)))
+ watch(forwarderSubscriber)
+ register(TraceMetrics("one-shot"), TraceMetrics.Factory)
+ subscribe(TraceMetrics, "one-shot", forwarderSubscriber, permanently = true)
+
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ val tickSnapshot = subscriber.expectMsgType[TickMetricSnapshot]
+ tickSnapshot.metrics.size should be(1)
+ tickSnapshot.metrics.keys should contain(TraceMetrics("one-shot"))
+
+ forwarderSubscriber ! PoisonPill
+ expectTerminated(forwarderSubscriber)
+
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ metricsExtension.subscriptions ! Subscriptions.FlushMetrics
+ subscriber.expectNoMsg(2 seconds)
+ }
+ }
+}
+
+class ForwarderSubscriber(target: ActorRef) extends Actor {
+ def receive = {
+ case anything ⇒ target.forward(anything)
+ }
+}
diff --git a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
index 385c8404..8f1d06d8 100644
--- a/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/UserMetricsSpec.scala
@@ -14,7 +14,7 @@ class UserMetricsSpec extends TestKitBase with WordSpecLike with Matchers with I
implicit lazy val system: ActorSystem = ActorSystem("actor-metrics-spec", ConfigFactory.parseString(
"""
|kamon.metrics {
- | flush-interval = 1 hour
+ | tick-interval = 1 hour
| default-collection-context-buffer-size = 10
|
| precision {