diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
3 files changed, 47 insertions, 6 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala index d6aa9fb9..3901ea75 100644 --- a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala @@ -91,7 +91,7 @@ object InstrumentFactory { } private def nonEmptySection(entry: (String, Config)): Boolean = entry match { - case (_, config) => config.firstLevelKeys.nonEmpty + case (_, config) => config.topLevelKeys.nonEmpty } private def readCustomInstrumentSettings(entry: (String, Config)): (String, CustomInstrumentSettings) = { diff --git a/kamon-core/src/main/scala/kamon/metric/Metric.scala b/kamon-core/src/main/scala/kamon/metric/Metric.scala index 1dd1a99c..53b58eb1 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metric.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metric.scala @@ -24,6 +24,11 @@ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap import java.time.Duration +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} + +import org.slf4j.LoggerFactory + +import scala.util.Try @@ -100,8 +105,11 @@ private[kamon] final class HistogramMetricImpl(val name: String, val unit: Measu } private[kamon] final class MinMaxCounterMetricImpl(val name: String, val unit: MeasurementUnit, customDynamicRange: Option[DynamicRange], - customSampleInterval: Option[Duration], factory: AtomicReference[InstrumentFactory]) - extends BaseMetric[MinMaxCounter, MetricDistribution](MinMaxCounter) with MinMaxCounterMetric { + customSampleInterval: Option[Duration], factory: AtomicReference[InstrumentFactory], scheduler: ScheduledExecutorService) + extends BaseMetric[MinMaxCounter, MetricDistribution](MinMaxCounter) with MinMaxCounterMetric { + + private val logger = LoggerFactory.getLogger(classOf[MinMaxCounterMetric]) + private val scheduledSamplers = TrieMap.empty[Tags, ScheduledFuture[_]] def dynamicRange: DynamicRange = baseInstrument.dynamicRange @@ -124,11 +132,42 @@ private[kamon] final class MinMaxCounterMetricImpl(val name: String, val unit: M override def sample(): Unit = baseInstrument.sample() - override protected def createInstrument(tags: Tags): MinMaxCounter = - factory.get().buildMinMaxCounter(customDynamicRange, customSampleInterval)(name, tags, unit) + override protected def createInstrument(tags: Tags): MinMaxCounter = { + val mmCounter = factory.get().buildMinMaxCounter(customDynamicRange, customSampleInterval)(name, tags, unit) + val sampleInterval = mmCounter.sampleInterval.toMillis + val scheduledFuture = scheduler.scheduleAtFixedRate(scheduledSampler(mmCounter), sampleInterval, sampleInterval, TimeUnit.MILLISECONDS) + + println("SCHEDULING THE MMCOUNTER " + name + ", tags=" + tags.prettyPrint()) + scheduledSamplers.put(tags, scheduledFuture) + + mmCounter + } + + override def remove(tags: Tags): Boolean = + removeAndStopSampler(tags) + + override def remove(tags: (String, String)*): Boolean = + removeAndStopSampler(tags.toMap) + + override def remove(tag: String, value: String): Boolean = + removeAndStopSampler(Map(tag -> value)) + + private def removeAndStopSampler(tags: Tags): Boolean = { + val removed = super.remove(tags) + if(removed) + scheduledSamplers.get(tags).foreach(sf => { + Try(sf.cancel(false)).failed.foreach(_ => logger.error("Failed to cancel scheduled sampling for MinMaxCounter []", tags.prettyPrint())) + }) + removed + } override protected def createSnapshot(instrument: MinMaxCounter): MetricDistribution = instrument.asInstanceOf[SimpleMinMaxCounter].snapshot(resetState = true) + + + private def scheduledSampler(mmCounter: MinMaxCounter): Runnable = new Runnable { + override def run(): Unit = mmCounter.sample() + } } diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index eada120a..cecc2c19 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -24,11 +24,13 @@ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap import java.time.Duration +import java.util.concurrent.Executors import org.slf4j.LoggerFactory class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { + private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-min-max-counter-sampler")) private val logger = LoggerFactory.getLogger(classOf[MetricRegistry]) private val instrumentFactory = new AtomicReference[InstrumentFactory]() private val metrics = TrieMap.empty[String, BaseMetric[_, _]] @@ -50,7 +52,7 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { lookupMetric(name, unit, InstrumentTypes.Gauge)(new GaugeMetricImpl(name, unit, instrumentFactory)) def minMaxCounter(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounterMetric = - lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory)) + lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory, registryExecutionContext)) override def snapshot(): MetricsSnapshot = synchronized { |