diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala | 166 |
1 files changed, 89 insertions, 77 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index ed55ab06..b738eeb9 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -16,91 +16,119 @@ package kamon.metric -import akka.event.Logging.Error -import akka.event.EventStream +import akka.actor +import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } +import kamon.{ ModuleSupervisor, Kamon } +import kamon.metric.instrument.{ InstrumentFactory, CollectionContext } import scala.collection.concurrent.TrieMap import akka.actor._ -import com.typesafe.config.Config -import kamon.util.GlobPathFilter -import kamon.Kamon -import akka.actor -import kamon.metric.Metrics.MetricGroupFilter -import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } -import java.util.concurrent.TimeUnit +import kamon.util.{ FastDispatch, TriemapAtomicGetOrElseUpdate } -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap +object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): MetricsExtension = super.get(system) + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtensionImpl(system) +} - val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") - printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) +case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) - /** Configured Dispatchers */ - val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) - val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) +trait MetricsExtension extends Kamon.Extension { + def settings: MetricsExtensionSettings + def shouldTrack(entity: Entity): Boolean + def shouldTrack(entityName: String, category: String): Boolean = + shouldTrack(Entity(entityName, category)) - /** Configuration Settings */ - val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] + def unregister(entity: Entity): Unit - val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(metricsExtConfig) - lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") + def find(entity: Entity): Option[EntityRecorder] + def find(name: String, category: String): Option[EntityRecorder] - def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { - if (shouldTrack(identity)) - Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) - else - None - } + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = + subscribe(filter, subscriber, permanently = false) - def unregister(identity: MetricGroupIdentity): Unit = { - storage.remove(identity).map(_.cleanup) - } + def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently) - def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit = - subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber) + def subscribe(category: String, selection: String, subscriber: ActorRef): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false) - def unsubscribe(subscriber: ActorRef): Unit = - subscriptions.tell(Unsubscribe(subscriber), subscriber) + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { - import scala.concurrent.duration._ + def unsubscribe(subscriber: ActorRef): Unit + def buildDefaultCollectionContext: CollectionContext + def instrumentFactory(category: String): InstrumentFactory +} - system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { - body - }(gaugeRecordingsDispatcher) - } +class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension { + import FastDispatch.Syntax - private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) - } + val settings = MetricsExtensionSettings(system) - def loadFilters(config: Config): Map[String, MetricGroupFilter] = { - import scala.collection.JavaConverters._ + private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] + private val _collectionContext = buildDefaultCollectionContext + private val _metricsCollectionDispatcher = system.dispatchers.lookup(settings.metricCollectionDispatcher) + private val _subscriptions = ModuleSupervisor.get(system).createModule("subscriptions-dispatcher", + SubscriptionsDispatcher.props(settings.tickInterval, collectSnapshots).withDispatcher(settings.metricCollectionDispatcher)) - val filters = config.getObjectList("filters").asScala + def shouldTrack(entity: Entity): Boolean = + settings.entityFilters.get(entity.category).map { + filter ⇒ filter.accept(entity.name) - val allFilters = - for ( - filter ← filters; - entry ← filter.entrySet().asScala - ) yield { - val key = entry.getKey - val keyBasedConfig = entry.getValue.atKey(key) + } getOrElse (settings.trackUnmatchedEntities) - val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList - val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = { + import TriemapAtomicGetOrElseUpdate.Syntax + val entity = Entity(entityName, recorderFactory.category) - (key, MetricGroupFilter(includes, excludes)) - } + if (shouldTrack(entity)) { + val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory) + val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory)).asInstanceOf[T] + Some(EntityRegistration(entity, recorder)) + } else None + } - allFilters.toMap + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = { + import TriemapAtomicGetOrElseUpdate.Syntax + EntityRegistration(entity, _trackedEntities.atomicGetOrElseUpdate(entity, recorder).asInstanceOf[T]) } + def unregister(entity: Entity): Unit = + _trackedEntities.remove(entity).map(_.cleanup) + + def find(entity: Entity): Option[EntityRecorder] = + _trackedEntities.get(entity) + + def find(name: String, category: String): Option[EntityRecorder] = + find(Entity(name, category)) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = + _subscriptions.fastDispatch(Subscribe(filter, subscriber, permanent))(_metricsCollectionDispatcher) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.fastDispatch(Unsubscribe(subscriber))(_metricsCollectionDispatcher) + def buildDefaultCollectionContext: CollectionContext = - CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size")) + CollectionContext(settings.defaultCollectionContextBufferSize) + + def instrumentFactory(category: String): InstrumentFactory = + settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) + + /** + * Collect and dispatch. + */ + private def collectSnapshots(): Map[Entity, EntitySnapshot] = { + val builder = Map.newBuilder[Entity, EntitySnapshot] + _trackedEntities.foreach { + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(_collectionContext))) + } - def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { + builder.result() + } + + /* def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { if (!disableWeaverMissingError) { val weaverMissingMessage = """ @@ -123,22 +151,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { eventStream.publish(Error("MetricsExtension", classOf[MetricsExtension], weaverMissingMessage)) } - } + }*/ } -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } - - implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { - def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = - trieMap.get(key) match { - case Some(v) ⇒ v - case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) - } - } -} |