diff options
Diffstat (limited to 'kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala')
-rw-r--r-- | kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala | 116 |
1 files changed, 0 insertions, 116 deletions
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala deleted file mode 100644 index 28bf77db..00000000 --- a/kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor._ -import kamon.metric.SubscriptionsDispatcher._ -import kamon.util.MilliTimestamp -import scala.concurrent.duration.FiniteDuration - -/** - * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers. - */ -private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsModuleImpl) extends Actor { - var lastTick = MilliTimestamp.now - var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] - var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter] - val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher) - val collectionContext = metricsExtension.buildDefaultCollectionContext - - def receive = { - case Tick ⇒ processTick() - case Subscribe(filter, subscriber, permanently) ⇒ subscribe(filter, subscriber, permanently) - case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) - case Terminated(subscriber) ⇒ unsubscribe(subscriber) - } - - def processTick(): Unit = - dispatch(metricsExtension.collectSnapshots(collectionContext)) - - def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = { - def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] = - storage.updated(subscriber, storage.getOrElse(subscriber, SubscriptionFilter.Empty).combine(filter)) - - context.watch(subscriber) - - if (permanent) - permanentSubscriptions = addSubscription(permanentSubscriptions) - else - oneShotSubscriptions = addSubscription(oneShotSubscriptions) - } - - def unsubscribe(subscriber: ActorRef): Unit = { - permanentSubscriptions = permanentSubscriptions - subscriber - oneShotSubscriptions = oneShotSubscriptions - subscriber - } - - def dispatch(snapshots: Map[Entity, EntitySnapshot]): Unit = { - val currentTick = MilliTimestamp.now - - dispatchSelections(lastTick, currentTick, permanentSubscriptions, snapshots) - dispatchSelections(lastTick, currentTick, oneShotSubscriptions, snapshots) - - lastTick = currentTick - oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] - } - - def dispatchSelections(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, SubscriptionFilter], - snapshots: Map[Entity, EntitySnapshot]): Unit = { - - for ((subscriber, filter) ← subscriptions) { - val selection = snapshots.filter(group ⇒ filter.accept(group._1)) - val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - - subscriber ! tickMetrics - } - } -} - -object SubscriptionsDispatcher { - def props(interval: FiniteDuration, metricsExtension: MetricsModuleImpl): Props = - Props(new SubscriptionsDispatcher(interval, metricsExtension)) - - case object Tick - case class Unsubscribe(subscriber: ActorRef) - case class Subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean = false) - case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[Entity, EntitySnapshot]) - -} - -trait SubscriptionFilter { self ⇒ - - def accept(entity: Entity): Boolean - - final def combine(that: SubscriptionFilter): SubscriptionFilter = new SubscriptionFilter { - override def accept(entity: Entity): Boolean = self.accept(entity) || that.accept(entity) - } -} - -object SubscriptionFilter { - val Empty = new SubscriptionFilter { - def accept(entity: Entity): Boolean = false - } - - def apply(category: String, name: String): SubscriptionFilter = new SubscriptionFilter { - val categoryPattern = new GlobPathFilter(category) - val namePattern = new GlobPathFilter(name) - - def accept(entity: Entity): Boolean = { - categoryPattern.accept(entity.category) && namePattern.accept(entity.name) - } - } -}
\ No newline at end of file |