diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala new file mode 100644 index 00000000..5b2a902d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -0,0 +1,90 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import akka.actor.{ActorRef, Actor} +import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe} +import kamon.util.GlobPathFilter +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeUnit +import kamon.metrics.MetricGroupIdentity.Category +import kamon.Kamon + +class Subscriptions extends Actor { + import context.system + + val config = context.system.settings.config + val tickInterval = Duration(config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var lastTick: Long = System.currentTimeMillis() + var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + + def receive = { + case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent) + case FlushMetrics => flush() + } + + def subscribe(category: Category, selection: String, permanent: Boolean): Unit = { + val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) + if(permanent) { + val receivers = subscribedPermanently.get(filter).getOrElse(Nil) + subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + + } else { + val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) + subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + } + + } + + def flush(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(Metrics).collect + + dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + + lastTick = currentTick + subscribedForOneShot = Map.empty + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + + for((filter, receivers) <- subscriptions) yield { + val selection = snapshots.filter(group => filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + receivers.foreach(_ ! tickMetrics) + } + } +} + +object Subscriptions { + case object FlushMetrics + case class Subscribe(category: Category, selection: String, permanently: Boolean = false) + case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) + + case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) { + def accept(identity: MetricGroupIdentity): Boolean = { + category.equals(identity.category) && globFilter.accept(identity.name) + } + } + +} |