/* * ========================================================================================= * Copyright © 2013 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. * ========================================================================================= */ package kamon.metrics import akka.actor.{ ActorRef, Actor } import kamon.metrics.Subscriptions.{ MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe } import kamon.util.GlobPathFilter import scala.concurrent.duration.Duration import java.util.concurrent.TimeUnit import kamon.metrics.MetricGroupIdentity.Category import kamon.Kamon class Subscriptions extends Actor { import context.system val config = context.system.settings.config val tickInterval = Duration(config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) var lastTick: Long = System.currentTimeMillis() var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty def receive = { case Subscribe(category, selection, permanent) ⇒ subscribe(category, selection, permanent) case FlushMetrics ⇒ flush() } def subscribe(category: Category, selection: String, permanent: Boolean): Unit = { val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) if (permanent) { val receivers = subscribedPermanently.get(filter).getOrElse(Nil) subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) } else { val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) } } def flush(): Unit = { val currentTick = System.currentTimeMillis() val snapshots = Kamon(Metrics).collect dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) lastTick = currentTick subscribedForOneShot = Map.empty } def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { for ((filter, receivers) ← subscriptions) yield { val selection = snapshots.filter(group ⇒ filter.accept(group._1)) val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) receivers.foreach(_ ! tickMetrics) } } } object Subscriptions { case object FlushMetrics case class Subscribe(category: Category, selection: String, permanently: Boolean = false) case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) { def accept(identity: MetricGroupIdentity): Boolean = { category.equals(identity.category) && globFilter.accept(identity.name) } } }