From 105ed9cb264eb3569b5ae0d65ac2fd8cb636f8e8 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 21 May 2017 14:05:05 +0200 Subject: wip, trying to get something that could be tested --- .../src/main/scala/kamon/metric/EntityFilter.scala | 10 +++---- .../main/scala/kamon/metric/EntityRecorder.scala | 35 +++++++++++++++++----- .../main/scala/kamon/metric/RecorderRegistry.scala | 11 ++++++- 3 files changed, 43 insertions(+), 13 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/metric') diff --git a/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala b/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala index cf203609..77fbfc4b 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala @@ -7,13 +7,13 @@ import com.typesafe.config.Config object EntityFilter { def fromConfig(config: Config): EntityFilter = { val filtersConfig = config.getConfig("kamon.metric.filters") - val acceptUnmatched = filtersConfig.getBoolean("accept-unmatched") + val acceptUnmatched = filtersConfig.getBoolean("accept-unmatched-categories") - val perCategoryFilters = filtersConfig.firstLevelKeys.filter(_ != "accept-unmatched") map { category: String ⇒ + val perCategoryFilters = filtersConfig.firstLevelKeys.filter(_ != "accept-unmatched-categories") map { category: String ⇒ val includes = readFilters(filtersConfig, s"$category.includes") val excludes = readFilters(filtersConfig, s"$category.excludes") - (category, new IncludeExcludeNameFilter(includes, excludes, acceptUnmatched)) + (category, new IncludeExcludeNameFilter(includes, excludes)) } toMap new EntityFilter(perCategoryFilters, acceptUnmatched) @@ -49,9 +49,9 @@ trait NameFilter { def accept(name: String): Boolean } -class IncludeExcludeNameFilter(includes: Seq[NameFilter], excludes: Seq[NameFilter], acceptUnmatched: Boolean) extends NameFilter { +class IncludeExcludeNameFilter(includes: Seq[NameFilter], excludes: Seq[NameFilter]) extends NameFilter { override def accept(name: String): Boolean = - (includes.exists(_.accept(name)) || acceptUnmatched) && !excludes.exists(_.accept(name)) + includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } class RegexNameFilter(path: String) extends NameFilter { diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala index 8ce37082..ccdb463e 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -1,11 +1,13 @@ package kamon.metric import java.time.Duration +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} import kamon.metric.instrument._ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap +import scala.util.Try trait EntityRecorder { def histogram(name: String): Histogram @@ -25,12 +27,11 @@ trait EntitySnapshotProducer { def snapshot(): EntitySnapshot } +class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory, scheduler: ScheduledExecutorService) + extends EntityRecorder with EntitySnapshotProducer { - - -class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory) extends EntityRecorder with EntitySnapshotProducer { private val histograms = TrieMap.empty[String, Histogram with DistributionSnapshotInstrument] - private val minMaxCounters = TrieMap.empty[String, MinMaxCounter with DistributionSnapshotInstrument] + private val minMaxCounters = TrieMap.empty[String, MinMaxCounterEntry] private val counters = TrieMap.empty[String, Counter with SingleValueSnapshotInstrument] private val gauges = TrieMap.empty[String, Gauge with SingleValueSnapshotInstrument] @@ -41,10 +42,14 @@ class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory histograms.atomicGetOrElseUpdate(name, instrumentFactory.buildHistogram(entity, name, dynamicRange, measurementUnit)) def minMaxCounter(name: String): MinMaxCounter = - minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name)) + minMaxCounters.atomicGetOrElseUpdate(name, + createMMCounterEntry(instrumentFactory.buildMinMaxCounter(entity, name)) + ).mmCounter def minMaxCounter(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, sampleFrequency: Duration): MinMaxCounter = - minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit)) + minMaxCounters.atomicGetOrElseUpdate(name, + createMMCounterEntry(instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit)) + ).mmCounter def gauge(name: String): Gauge = gauges.atomicGetOrElseUpdate(name, instrumentFactory.buildGauge(entity, name)) @@ -62,8 +67,24 @@ class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory new EntitySnapshot( entity, histograms = histograms.values.map(_.snapshot()).toSeq, - minMaxCounters = minMaxCounters.values.map(_.snapshot()).toSeq, + minMaxCounters = minMaxCounters.values.map(_.mmCounter.snapshot()).toSeq, gauges = gauges.values.map(_.snapshot()).toSeq, counters = counters.values.map(_.snapshot()).toSeq ) + + def cleanup(): Unit = { + minMaxCounters.values.foreach { mmCounter => + Try(mmCounter.refreshFuture.cancel(true)) + } + } + + private case class MinMaxCounterEntry(mmCounter: MinMaxCounter with DistributionSnapshotInstrument, refreshFuture: ScheduledFuture[_]) + + private def createMMCounterEntry(mmCounter: MinMaxCounter with DistributionSnapshotInstrument): MinMaxCounterEntry = { + val refreshFuture = scheduler.schedule(new Runnable { + override def run(): Unit = mmCounter.sample() + }, mmCounter.sampleInterval.toMillis, TimeUnit.MILLISECONDS) + + MinMaxCounterEntry(mmCounter, refreshFuture) + } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala index 53081760..fd728b1d 100644 --- a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala @@ -1,6 +1,7 @@ package kamon package metric +import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.Config @@ -16,6 +17,7 @@ trait RecorderRegistry { } class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { + private val scheduler = new ScheduledThreadPoolExecutor(1, numberedThreadFactory("kamon.metric.refresh-scheduler")) private val instrumentFactory = new AtomicReference[InstrumentFactory]() private val entityFilter = new AtomicReference[EntityFilter]() private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer] @@ -27,7 +29,7 @@ class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { entityFilter.get().accept(entity) override def getRecorder(entity: Entity): EntityRecorder = - entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get())) + entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get(), scheduler)) override def removeRecorder(entity: Entity): Boolean = entities.remove(entity).nonEmpty @@ -35,13 +37,20 @@ class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { private[kamon] def reconfigure(config: Config): Unit = synchronized { instrumentFactory.set(InstrumentFactory.fromConfig(config)) entityFilter.set(EntityFilter.fromConfig(config)) + + val refreshSchedulerPoolSize = config.getInt("kamon.metric.refresh-scheduler-pool-size") + scheduler.setCorePoolSize(refreshSchedulerPoolSize) } private[kamon] def snapshot(): Seq[EntitySnapshot] = { entities.values.map(_.snapshot()).toSeq } + + //private[kamon] def diagnosticData } +case class RecorderRegistryDiagnostic(entities: Seq[Entity]) + -- cgit v1.2.3