aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-07-28 01:02:02 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-07-28 01:02:02 -0300
commitca65ad5b1bd5156ec487b435c9c015d6111963f0 (patch)
treeff6e7bf309fb20b768de274a69efa893bff6117a /kamon-core/src/main/scala/kamon/metric
parentb07e8303e2e3eaf52638e1d51853729cb4983c73 (diff)
downloadKamon-ca65ad5b1bd5156ec487b435c9c015d6111963f0.tar.gz
Kamon-ca65ad5b1bd5156ec487b435c9c015d6111963f0.tar.bz2
Kamon-ca65ad5b1bd5156ec487b435c9c015d6111963f0.zip
= core: subscription protocol specification, closes #51
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Subscriptions.scala93
2 files changed, 73 insertions, 30 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
index abc6e44e..00214f51 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala
@@ -26,7 +26,7 @@ import kamon.util.GlobPathFilter
import kamon.Kamon
import akka.actor
import kamon.metric.Metrics.MetricGroupFilter
-import kamon.metric.Subscriptions.Subscribe
+import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe }
import java.util.concurrent.TimeUnit
class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
@@ -55,9 +55,11 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension {
storage.remove(identity)
}
- def subscribe[C <: MetricGroupCategory](category: C, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = {
- subscriptions.tell(Subscribe(category, selection, permanently), receiver)
- }
+ def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit =
+ subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber)
+
+ def unsubscribe(subscriber: ActorRef): Unit =
+ subscriptions.tell(Unsubscribe(subscriber), subscriber)
def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = {
import scala.concurrent.duration._
diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
index eb2168ad..c6571507 100644
--- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala
@@ -16,8 +16,8 @@
package kamon.metric
-import akka.actor.{ Props, ActorRef, Actor }
-import kamon.metric.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe }
+import akka.actor._
+import kamon.metric.Subscriptions._
import kamon.util.GlobPathFilter
import scala.concurrent.duration.{ FiniteDuration, Duration }
import java.util.concurrent.TimeUnit
@@ -27,65 +27,106 @@ import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
class Subscriptions extends Actor {
import context.system
- val config = context.system.settings.config
- val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
- val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+ val flushMetricsSchedule = scheduleFlushMessage()
val collectionContext = Kamon(Metrics).buildDefaultCollectionContext
var lastTick: Long = System.currentTimeMillis()
- var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
- var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+ var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
+ var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty
def receive = {
- case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent)
- case FlushMetrics ⇒ flush()
+ case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent)
+ case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
+ case Terminated(subscriber) ⇒ unsubscribe(subscriber)
+ case FlushMetrics ⇒ flush()
}
- def subscribe(category: MetricGroupCategory, selection: String, permanent: Boolean): Unit = {
- val filter = MetricGroupFilter(category, new GlobPathFilter(selection))
- if (permanent) {
- val receivers = subscribedPermanently.get(filter).getOrElse(Nil)
- subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers)
+ def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = {
+ context.watch(subscriber)
+ val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection))
+ if (permanent) {
+ permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine {
+ permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
+ })
} else {
- val receivers = subscribedForOneShot.get(filter).getOrElse(Nil)
- subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers)
+ oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine {
+ oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty)
+ })
}
+ }
+ def unsubscribe(subscriber: ActorRef): Unit = {
+ if (permanentSubscriptions.contains(subscriber))
+ permanentSubscriptions = permanentSubscriptions - subscriber
+
+ if (oneShotSubscriptions.contains(subscriber))
+ oneShotSubscriptions = oneShotSubscriptions - subscriber
}
def flush(): Unit = {
val currentTick = System.currentTimeMillis()
val snapshots = collectAll()
- dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots)
- dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots)
+ dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots)
+ dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots)
lastTick = currentTick
- subscribedForOneShot = Map.empty
+ oneShotSubscriptions = Map.empty
}
- def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] =
- (for ((identity, recorder) ← Kamon(Metrics).storage) yield (identity, recorder.collect(collectionContext))).toMap
+ def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = {
+ val allMetrics = Kamon(Metrics).storage
+ val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot]
- def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
+ allMetrics.foreach {
+ case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
+ }
+
+ builder.result()
+ }
+
+ def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[ActorRef, MetricSelectionFilter],
snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
- for ((filter, receivers) ← subscriptions) yield {
+ for ((subscriber, filter) ← subscriptions) {
val selection = snapshots.filter(group ⇒ filter.accept(group._1))
val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
- receivers.foreach(_ ! tickMetrics)
+ subscriber ! tickMetrics
}
}
+
+ def scheduleFlushMessage(): Cancellable = {
+ val config = context.system.settings.config
+ val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)
+ context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+ }
}
object Subscriptions {
case object FlushMetrics
- case class Subscribe(category: MetricGroupCategory, selection: String, permanently: Boolean = false)
+ case class Unsubscribe(subscriber: ActorRef)
+ case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false)
case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot])
- case class MetricGroupFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) {
+ trait MetricSelectionFilter {
+ def accept(identity: MetricGroupIdentity): Boolean
+ }
+
+ object MetricSelectionFilter {
+ val empty = new MetricSelectionFilter {
+ def accept(identity: MetricGroupIdentity): Boolean = false
+ }
+
+ implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) {
+ def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter {
+ def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity)
+ }
+ }
+ }
+
+ case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter {
def accept(identity: MetricGroupIdentity): Boolean = {
category.equals(identity.category) && globFilter.accept(identity.name)
}