aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2014-01-28 01:25:51 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-01-28 01:25:51 -0300
commite31ed96edbdf61ea0e20e879ec013400f4ef17ec (patch)
tree735f8c132ba7187b20e11856f9bc97b570970972 /kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
parent966ba65ee12bdc60b231d421ab9d31b7c050b630 (diff)
downloadKamon-e31ed96edbdf61ea0e20e879ec013400f4ef17ec.tar.gz
Kamon-e31ed96edbdf61ea0e20e879ec013400f4ef17ec.tar.bz2
Kamon-e31ed96edbdf61ea0e20e879ec013400f4ef17ec.zip
store actor metrics in the new metrics extension
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala90
1 files changed, 90 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
new file mode 100644
index 00000000..5b2a902d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala
@@ -0,0 +1,90 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metrics
+
+import akka.actor.{ActorRef, Actor}
+import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe}
+import kamon.util.GlobPathFilter
+import scala.concurrent.duration.Duration
+import java.util.concurrent.TimeUnit
+import kamon.metrics.MetricGroupIdentity.Category
+import kamon.Kamon
+
+class Subscriptions extends Actor {
+ import context.system
+
+ val config = context.system.settings.config
+ val tickInterval = Duration(config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS)
+ val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher)
+
+ var lastTick: Long = System.currentTimeMillis()
+ var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+ var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty
+
+ def receive = {
+ case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent)
+ case FlushMetrics => flush()
+ }
+
+ def subscribe(category: Category, selection: String, permanent: Boolean): Unit = {
+ val filter = MetricGroupFilter(category, new GlobPathFilter(selection))
+ if(permanent) {
+ val receivers = subscribedPermanently.get(filter).getOrElse(Nil)
+ subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers)
+
+ } else {
+ val receivers = subscribedForOneShot.get(filter).getOrElse(Nil)
+ subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers)
+ }
+
+ }
+
+ def flush(): Unit = {
+ val currentTick = System.currentTimeMillis()
+ val snapshots = Kamon(Metrics).collect
+
+ dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots)
+ dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots)
+
+ lastTick = currentTick
+ subscribedForOneShot = Map.empty
+ }
+
+ def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]],
+ snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = {
+
+ for((filter, receivers) <- subscriptions) yield {
+ val selection = snapshots.filter(group => filter.accept(group._1))
+ val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
+
+ receivers.foreach(_ ! tickMetrics)
+ }
+ }
+}
+
+object Subscriptions {
+ case object FlushMetrics
+ case class Subscribe(category: Category, selection: String, permanently: Boolean = false)
+ case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot])
+
+ case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) {
+ def accept(identity: MetricGroupIdentity): Boolean = {
+ category.equals(identity.category) && globFilter.accept(identity.name)
+ }
+ }
+
+}