diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-01-28 01:25:51 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-01-28 01:25:51 -0300 |
commit | 01450abea84a4c0f9f4efe73201a8ca041acea2b (patch) | |
tree | 601b76380bd5c3ba405d6b480ddd1090b574ea98 /kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala | |
parent | b6ea0a93e6be8e1f355f1bc993618d178d0c9372 (diff) | |
download | Kamon-01450abea84a4c0f9f4efe73201a8ca041acea2b.tar.gz Kamon-01450abea84a4c0f9f4efe73201a8ca041acea2b.tar.bz2 Kamon-01450abea84a4c0f9f4efe73201a8ca041acea2b.zip |
store actor metrics in the new metrics extension
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala new file mode 100644 index 00000000..5b2a902d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -0,0 +1,90 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import akka.actor.{ActorRef, Actor} +import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe} +import kamon.util.GlobPathFilter +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeUnit +import kamon.metrics.MetricGroupIdentity.Category +import kamon.Kamon + +class Subscriptions extends Actor { + import context.system + + val config = context.system.settings.config + val tickInterval = Duration(config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var lastTick: Long = System.currentTimeMillis() + var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + + def receive = { + case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent) + case FlushMetrics => flush() + } + + def subscribe(category: Category, selection: String, permanent: Boolean): Unit = { + val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) + if(permanent) { + val receivers = subscribedPermanently.get(filter).getOrElse(Nil) + subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + + } else { + val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) + subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + } + + } + + def flush(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(Metrics).collect + + dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + + lastTick = currentTick + subscribedForOneShot = Map.empty + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + + for((filter, receivers) <- subscriptions) yield { + val selection = snapshots.filter(group => filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + receivers.foreach(_ ! tickMetrics) + } + } +} + +object Subscriptions { + case object FlushMetrics + case class Subscribe(category: Category, selection: String, permanently: Boolean = false) + case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) + + case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) { + def accept(identity: MetricGroupIdentity): Boolean = { + category.equals(identity.category) && globFilter.accept(identity.name) + } + } + +} |