diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-01-28 01:25:51 -0300 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-01-28 01:25:51 -0300 |
commit | e31ed96edbdf61ea0e20e879ec013400f4ef17ec (patch) | |
tree | 735f8c132ba7187b20e11856f9bc97b570970972 /kamon-core/src/main/scala/kamon/metrics | |
parent | 966ba65ee12bdc60b231d421ab9d31b7c050b630 (diff) | |
download | Kamon-e31ed96edbdf61ea0e20e879ec013400f4ef17ec.tar.gz Kamon-e31ed96edbdf61ea0e20e879ec013400f4ef17ec.tar.bz2 Kamon-e31ed96edbdf61ea0e20e879ec013400f4ef17ec.zip |
store actor metrics in the new metrics extension
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metrics')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala | 140 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala | 18 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala | 90 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala (renamed from kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala) | 0 |
4 files changed, 107 insertions, 141 deletions
diff --git a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala b/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala deleted file mode 100644 index 0e3af5fd..00000000 --- a/kamon-core/src/main/scala/kamon/metrics/ActorMetricsOps.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metrics - -import org.HdrHistogram.{ HighDynamicRangeRecorder, AbstractHistogram, AtomicHistogram } -import kamon.util.GlobPathFilter -import scala.collection.concurrent.TrieMap -import scala.collection.JavaConversions.iterableAsScalaIterable -import akka.actor._ -import kamon.metrics.ActorMetricsDispatcher.{ ActorMetricsSnapshot, FlushMetrics } -import kamon.Kamon -import scala.concurrent.duration._ -import java.util.concurrent.TimeUnit -import kamon.metrics.ActorMetricsDispatcher.Subscribe - -trait ActorMetricsOps { - self: MetricsExtension ⇒ - - val config = system.settings.config.getConfig("kamon.metrics.actors") - val actorMetrics = TrieMap[String, HdrActorMetricsRecorder]() - - val trackedActors: Vector[GlobPathFilter] = config.getStringList("tracked").map(glob ⇒ new GlobPathFilter(glob)).toVector - val excludedActors: Vector[GlobPathFilter] = config.getStringList("excluded").map(glob ⇒ new GlobPathFilter(glob)).toVector - - val actorMetricsFactory: () ⇒ HdrActorMetricsRecorder = { - val settings = config.getConfig("hdr-settings") - val processingTimeHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("processing-time")) - val timeInMailboxHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("time-in-mailbox")) - val mailboxSizeHdrConfig = HighDynamicRangeRecorder.Configuration.fromConfig(settings.getConfig("mailbox-size")) - - () ⇒ new HdrActorMetricsRecorder(processingTimeHdrConfig, timeInMailboxHdrConfig, mailboxSizeHdrConfig) - } - - def shouldTrackActor(path: String): Boolean = - trackedActors.exists(glob ⇒ glob.accept(path)) && !excludedActors.exists(glob ⇒ glob.accept(path)) - - def registerActor(path: String): HdrActorMetricsRecorder = actorMetrics.getOrElseUpdate(path, actorMetricsFactory()) - - def unregisterActor(path: String): Unit = actorMetrics.remove(path) -} - -class HdrActorMetricsRecorder(processingTimeHdrConfig: HighDynamicRangeRecorder.Configuration, timeInMailboxHdrConfig: HighDynamicRangeRecorder.Configuration, - mailboxSizeHdrConfig: HighDynamicRangeRecorder.Configuration) { - - val processingTimeHistogram = new AtomicHistogram(processingTimeHdrConfig.highestTrackableValue, processingTimeHdrConfig.significantValueDigits) - val timeInMailboxHistogram = new AtomicHistogram(timeInMailboxHdrConfig.highestTrackableValue, timeInMailboxHdrConfig.significantValueDigits) - val mailboxSizeHistogram = new AtomicHistogram(mailboxSizeHdrConfig.highestTrackableValue, mailboxSizeHdrConfig.significantValueDigits) - - def recordTimeInMailbox(waitTime: Long): Unit = timeInMailboxHistogram.recordValue(waitTime) - - def recordProcessingTime(processingTime: Long): Unit = processingTimeHistogram.recordValue(processingTime) - - def snapshot(): HdrActorMetricsSnapshot = { - HdrActorMetricsSnapshot(processingTimeHistogram.copy(), timeInMailboxHistogram.copy(), mailboxSizeHistogram.copy()) - } - - def reset(): Unit = { - processingTimeHistogram.reset() - timeInMailboxHistogram.reset() - mailboxSizeHistogram.reset() - } -} - -case class HdrActorMetricsSnapshot(processingTimeHistogram: AbstractHistogram, timeInMailboxHistogram: AbstractHistogram, - mailboxSizeHistogram: AbstractHistogram) - -class ActorMetricsDispatcher extends Actor { - val tickInterval = Duration(context.system.settings.config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) - val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - - var subscribedForever: Map[GlobPathFilter, List[ActorRef]] = Map.empty - var subscribedForOne: Map[GlobPathFilter, List[ActorRef]] = Map.empty - var lastTick = System.currentTimeMillis() - - def receive = { - case Subscribe(path, true) ⇒ subscribeForever(path, sender) - case Subscribe(path, false) ⇒ subscribeOneOff(path, sender) - case FlushMetrics ⇒ flushMetrics() - } - - def subscribeForever(path: String, receiver: ActorRef): Unit = subscribedForever = subscribe(receiver, path, subscribedForever) - - def subscribeOneOff(path: String, receiver: ActorRef): Unit = subscribedForOne = subscribe(receiver, path, subscribedForOne) - - def subscribe(receiver: ActorRef, path: String, target: Map[GlobPathFilter, List[ActorRef]]): Map[GlobPathFilter, List[ActorRef]] = { - val pathFilter = new GlobPathFilter(path) - val oldReceivers = target.get(pathFilter).getOrElse(Nil) - target.updated(pathFilter, receiver :: oldReceivers) - } - - def flushMetrics(): Unit = { - /* val currentTick = System.currentTimeMillis() - val snapshots = Kamon(Metrics)(context.system).actorMetrics.map { - case (path, metrics) ⇒ - val snapshot = metrics.snapshot() - metrics.reset() - - (path, snapshot) - }.toMap - - dispatchMetricsTo(subscribedForOne, snapshots, currentTick) - dispatchMetricsTo(subscribedForever, snapshots, currentTick) - - subscribedForOne = Map.empty - lastTick = currentTick*/ - } - - def dispatchMetricsTo(subscribers: Map[GlobPathFilter, List[ActorRef]], snapshots: Map[String, HdrActorMetricsSnapshot], - currentTick: Long): Unit = { - - for ((subscribedPath, receivers) ← subscribers) { - val metrics = snapshots.filterKeys(snapshotPath ⇒ subscribedPath.accept(snapshotPath)) - val actorMetrics = ActorMetricsSnapshot(lastTick, currentTick, metrics) - - receivers.foreach(ref ⇒ ref ! actorMetrics) - } - } -} - -object ActorMetricsDispatcher { - case class Subscribe(path: String, forever: Boolean = false) - case class UnSubscribe(path: String) - - case class ActorMetricsSnapshot(fromMillis: Long, toMillis: Long, metrics: Map[String, HdrActorMetricsSnapshot]) - case object FlushMetrics -} diff --git a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala index 4bac3519..11e3ebfc 100644 --- a/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metrics/MetricsExtension.scala @@ -17,12 +17,16 @@ package kamon.metrics import scala.collection.concurrent.TrieMap -import akka.actor.{ ExtensionIdProvider, ExtensionId, ExtendedActorSystem } +import akka.actor._ import com.typesafe.config.Config import kamon.util.GlobPathFilter import kamon.Kamon import akka.actor import kamon.metrics.Metrics.MetricGroupFilter +import kamon.metrics.MetricGroupIdentity.Category +import kamon.metrics.Metrics.MetricGroupFilter +import scala.Some +import kamon.metrics.Subscriptions.Subscribe case class MetricGroupIdentity(name: String, category: MetricGroupIdentity.Category) @@ -59,6 +63,11 @@ object MetricGroupIdentity { trait Category { def name: String } + + val AnyCategory = new Category { + def name: String = "match-all" + override def equals(that: Any): Boolean = that.isInstanceOf[Category] + } } trait MetricGroupFactory { @@ -66,10 +75,13 @@ trait MetricGroupFactory { def create(config: Config): Group } + + class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension { val config = system.settings.config val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() val filters = loadFilters(config) + lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") def register(name: String, category: MetricGroupIdentity.Category with MetricGroupFactory): Option[category.Group] = { if (shouldTrack(name, category)) @@ -82,6 +94,10 @@ class MetricsExtension(val system: ExtendedActorSystem) extends Kamon.Extension storage.remove(MetricGroupIdentity(name, category)) } + def subscribe(category: Category, selection: String, receiver: ActorRef, permanently: Boolean = false): Unit = { + subscriptions.tell(Subscribe(category, selection, permanently), receiver) + } + def collect: Map[MetricGroupIdentity, MetricGroupSnapshot] = { (for ((identity, recorder) ← storage) yield (identity, recorder.collect)).toMap } diff --git a/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala new file mode 100644 index 00000000..5b2a902d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metrics/Subscriptions.scala @@ -0,0 +1,90 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metrics + +import akka.actor.{ActorRef, Actor} +import kamon.metrics.Subscriptions.{MetricGroupFilter, FlushMetrics, TickMetricSnapshot, Subscribe} +import kamon.util.GlobPathFilter +import scala.concurrent.duration.Duration +import java.util.concurrent.TimeUnit +import kamon.metrics.MetricGroupIdentity.Category +import kamon.Kamon + +class Subscriptions extends Actor { + import context.system + + val config = context.system.settings.config + val tickInterval = Duration(config.getNanoseconds("kamon.metrics.tick-interval"), TimeUnit.NANOSECONDS) + val flushMetricsSchedule = context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) + + var lastTick: Long = System.currentTimeMillis() + var subscribedPermanently: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + var subscribedForOneShot: Map[MetricGroupFilter, List[ActorRef]] = Map.empty + + def receive = { + case Subscribe(category, selection, permanent) => subscribe(category, selection, permanent) + case FlushMetrics => flush() + } + + def subscribe(category: Category, selection: String, permanent: Boolean): Unit = { + val filter = MetricGroupFilter(category, new GlobPathFilter(selection)) + if(permanent) { + val receivers = subscribedPermanently.get(filter).getOrElse(Nil) + subscribedPermanently = subscribedPermanently.updated(filter, sender :: receivers) + + } else { + val receivers = subscribedForOneShot.get(filter).getOrElse(Nil) + subscribedForOneShot = subscribedForOneShot.updated(filter, sender :: receivers) + } + + } + + def flush(): Unit = { + val currentTick = System.currentTimeMillis() + val snapshots = Kamon(Metrics).collect + + dispatchSelectedMetrics(lastTick, currentTick, subscribedPermanently, snapshots) + dispatchSelectedMetrics(lastTick, currentTick, subscribedForOneShot, snapshots) + + lastTick = currentTick + subscribedForOneShot = Map.empty + } + + def dispatchSelectedMetrics(lastTick: Long, currentTick: Long, subscriptions: Map[MetricGroupFilter, List[ActorRef]], + snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { + + for((filter, receivers) <- subscriptions) yield { + val selection = snapshots.filter(group => filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + receivers.foreach(_ ! tickMetrics) + } + } +} + +object Subscriptions { + case object FlushMetrics + case class Subscribe(category: Category, selection: String, permanently: Boolean = false) + case class TickMetricSnapshot(from: Long, to: Long, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) + + case class MetricGroupFilter(category: Category, globFilter: GlobPathFilter) { + def accept(identity: MetricGroupIdentity): Boolean = { + category.equals(identity.category) && globFilter.accept(identity.name) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala index e31d0e11..e31d0e11 100644 --- a/kamon-core/src/main/scala/kamon/metrics/HighDynamicRangeRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metrics/instruments/HighDynamicRangeRecorder.scala |