diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala | 199 |
1 files changed, 98 insertions, 101 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index f491cc57..87911352 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -16,129 +16,126 @@ package kamon.metric -import akka.event.Logging.Error -import akka.event.EventStream +import com.typesafe.config.Config +import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } +import kamon.metric.instrument.{ DefaultRefreshScheduler, InstrumentFactory, CollectionContext } import scala.collection.concurrent.TrieMap import akka.actor._ -import com.typesafe.config.Config -import kamon.util.GlobPathFilter -import kamon.Kamon -import akka.actor -import kamon.metric.Metrics.MetricGroupFilter -import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } -import java.util.concurrent.TimeUnit - -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap - - val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") - printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) - - /** Configured Dispatchers */ - val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) - val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) - - /** Configuration Settings */ - val gaugeRecordingInterval: Long = metricsExtConfig.getMilliseconds("gauge-recording-interval") - - val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(metricsExtConfig) - lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") - - def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { - if (shouldTrack(identity)) - Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) - else - None - } +import kamon.util.{ LazyActorRef, TriemapAtomicGetOrElseUpdate } - def unregister(identity: MetricGroupIdentity): Unit = { - storage.remove(identity).map(_.cleanup) - } +case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) - def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit = - subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber) +trait MetricsExtension { + def settings: MetricsExtensionSettings + def shouldTrack(entity: Entity): Boolean + def shouldTrack(entityName: String, category: String): Boolean = + shouldTrack(Entity(entityName, category)) - def unsubscribe(subscriber: ActorRef): Unit = - subscriptions.tell(Unsubscribe(subscriber), subscriber) + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] + def unregister(entity: Entity): Unit - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { - import scala.concurrent.duration._ + def find(entity: Entity): Option[EntityRecorder] + def find(name: String, category: String): Option[EntityRecorder] - system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { - body - }(gaugeRecordingsDispatcher) - } + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = + subscribe(filter, subscriber, permanently = false) - private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) - } + def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently) + + def subscribe(category: String, selection: String, subscriber: ActorRef): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false) - def loadFilters(config: Config): Map[String, MetricGroupFilter] = { - import scala.collection.JavaConverters._ + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit - val filters = config.getObjectList("filters").asScala + def unsubscribe(subscriber: ActorRef): Unit + def buildDefaultCollectionContext: CollectionContext + def instrumentFactory(category: String): InstrumentFactory +} + +private[kamon] class MetricsExtensionImpl(config: Config) extends MetricsExtension { + private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] + private val _subscriptions = new LazyActorRef + + val settings = MetricsExtensionSettings(config) + + def shouldTrack(entity: Entity): Boolean = + settings.entityFilters.get(entity.category).map { + filter ⇒ filter.accept(entity.name) - val allFilters = - for ( - filter ← filters; - entry ← filter.entrySet().asScala - ) yield { - val key = entry.getKey - val keyBasedConfig = entry.getValue.atKey(key) + } getOrElse (settings.trackUnmatchedEntities) - val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList - val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = { + import TriemapAtomicGetOrElseUpdate.Syntax + val entity = Entity(entityName, recorderFactory.category) - (key, MetricGroupFilter(includes, excludes)) - } + if (shouldTrack(entity)) { + val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory) + val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory), _.cleanup).asInstanceOf[T] - allFilters.toMap + Some(EntityRegistration(entity, recorder)) + } else None } - def buildDefaultCollectionContext: CollectionContext = - CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size")) - - def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { - if (!disableWeaverMissingError) { - val weaverMissingMessage = - """ - | - | ___ _ ___ _ _ ___ ___ _ _ - | / _ \ | | |_ | | | | | | \/ |(_) (_) - |/ /_\ \ ___ _ __ ___ ___ | |_ | | | | | | ___ __ _ __ __ ___ _ __ | . . | _ ___ ___ _ _ __ __ _ - || _ |/ __|| '_ \ / _ \ / __|| __| | | | |/\| | / _ \ / _` |\ \ / // _ \| '__| | |\/| || |/ __|/ __|| || '_ \ / _` | - || | | |\__ \| |_) || __/| (__ | |_ /\__/ / \ /\ /| __/| (_| | \ V /| __/| | | | | || |\__ \\__ \| || | | || (_| | - |\_| |_/|___/| .__/ \___| \___| \__|\____/ \/ \/ \___| \__,_| \_/ \___||_| \_| |_/|_||___/|___/|_||_| |_| \__, | - | | | __/ | - | |_| |___/ - | - | It seems like your application wasn't started with the -javaagent:/path-to-aspectj-weaver.jar option. Without that Kamon might - | not work properly, if you need help on setting up the weaver go to http://kamon.io/introduction/get-started/ for more info. If - | you are sure that you don't need the weaver (e.g. you are only using KamonStandalone) then you can disable this error message - | by changing the kamon.metrics.disable-aspectj-weaver-missing-error setting in your configuration file. - | - """.stripMargin - - eventStream.publish(Error("MetricsExtension", classOf[MetricsExtension], weaverMissingMessage)) + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = { + _trackedEntities.put(entity, recorder).map { oldRecorder ⇒ + oldRecorder.cleanup } + + EntityRegistration(entity, recorder) } -} -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) + def unregister(entity: Entity): Unit = + _trackedEntities.remove(entity).map(_.cleanup) + + def find(entity: Entity): Option[EntityRecorder] = + _trackedEntities.get(entity) - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) + def find(name: String, category: String): Option[EntityRecorder] = + find(Entity(name, category)) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = + _subscriptions.tell(Subscribe(filter, subscriber, permanent)) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.tell(Unsubscribe(subscriber)) + + def buildDefaultCollectionContext: CollectionContext = + CollectionContext(settings.defaultCollectionContextBufferSize) + + def instrumentFactory(category: String): InstrumentFactory = + settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) + + private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = { + val builder = Map.newBuilder[Entity, EntitySnapshot] + _trackedEntities.foreach { + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) + } + + builder.result() + } + + /** + * Metrics Extension initialization. + */ + private var _system: ActorSystem = null + private lazy val _start = { + _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics")) + settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher)) } - implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { - def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = - trieMap.get(key) match { - case Some(v) ⇒ v - case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) - } + def start(system: ActorSystem): Unit = synchronized { + _system = system + _start + _system = null } } + +private[kamon] object MetricsExtensionImpl { + + def apply(config: Config) = + new MetricsExtensionImpl(config) +} + |