diff options
Diffstat (limited to 'kamon-core/src/main/scala')
5 files changed, 54 insertions, 17 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala index d6aa9fb9..3901ea75 100644 --- a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala @@ -91,7 +91,7 @@ object InstrumentFactory { } private def nonEmptySection(entry: (String, Config)): Boolean = entry match { - case (_, config) => config.firstLevelKeys.nonEmpty + case (_, config) => config.topLevelKeys.nonEmpty } private def readCustomInstrumentSettings(entry: (String, Config)): (String, CustomInstrumentSettings) = { diff --git a/kamon-core/src/main/scala/kamon/metric/Metric.scala b/kamon-core/src/main/scala/kamon/metric/Metric.scala index 1dd1a99c..53b58eb1 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metric.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metric.scala @@ -24,6 +24,11 @@ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap import java.time.Duration +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} + +import org.slf4j.LoggerFactory + +import scala.util.Try @@ -100,8 +105,11 @@ private[kamon] final class HistogramMetricImpl(val name: String, val unit: Measu } private[kamon] final class MinMaxCounterMetricImpl(val name: String, val unit: MeasurementUnit, customDynamicRange: Option[DynamicRange], - customSampleInterval: Option[Duration], factory: AtomicReference[InstrumentFactory]) - extends BaseMetric[MinMaxCounter, MetricDistribution](MinMaxCounter) with MinMaxCounterMetric { + customSampleInterval: Option[Duration], factory: AtomicReference[InstrumentFactory], scheduler: ScheduledExecutorService) + extends BaseMetric[MinMaxCounter, MetricDistribution](MinMaxCounter) with MinMaxCounterMetric { + + private val logger = LoggerFactory.getLogger(classOf[MinMaxCounterMetric]) + private val scheduledSamplers = TrieMap.empty[Tags, ScheduledFuture[_]] def dynamicRange: DynamicRange = baseInstrument.dynamicRange @@ -124,11 +132,42 @@ private[kamon] final class MinMaxCounterMetricImpl(val name: String, val unit: M override def sample(): Unit = baseInstrument.sample() - override protected def createInstrument(tags: Tags): MinMaxCounter = - factory.get().buildMinMaxCounter(customDynamicRange, customSampleInterval)(name, tags, unit) + override protected def createInstrument(tags: Tags): MinMaxCounter = { + val mmCounter = factory.get().buildMinMaxCounter(customDynamicRange, customSampleInterval)(name, tags, unit) + val sampleInterval = mmCounter.sampleInterval.toMillis + val scheduledFuture = scheduler.scheduleAtFixedRate(scheduledSampler(mmCounter), sampleInterval, sampleInterval, TimeUnit.MILLISECONDS) + + println("SCHEDULING THE MMCOUNTER " + name + ", tags=" + tags.prettyPrint()) + scheduledSamplers.put(tags, scheduledFuture) + + mmCounter + } + + override def remove(tags: Tags): Boolean = + removeAndStopSampler(tags) + + override def remove(tags: (String, String)*): Boolean = + removeAndStopSampler(tags.toMap) + + override def remove(tag: String, value: String): Boolean = + removeAndStopSampler(Map(tag -> value)) + + private def removeAndStopSampler(tags: Tags): Boolean = { + val removed = super.remove(tags) + if(removed) + scheduledSamplers.get(tags).foreach(sf => { + Try(sf.cancel(false)).failed.foreach(_ => logger.error("Failed to cancel scheduled sampling for MinMaxCounter []", tags.prettyPrint())) + }) + removed + } override protected def createSnapshot(instrument: MinMaxCounter): MetricDistribution = instrument.asInstanceOf[SimpleMinMaxCounter].snapshot(resetState = true) + + + private def scheduledSampler(mmCounter: MinMaxCounter): Runnable = new Runnable { + override def run(): Unit = mmCounter.sample() + } } diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index eada120a..cecc2c19 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -24,11 +24,13 @@ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap import java.time.Duration +import java.util.concurrent.Executors import org.slf4j.LoggerFactory class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { + private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-min-max-counter-sampler")) private val logger = LoggerFactory.getLogger(classOf[MetricRegistry]) private val instrumentFactory = new AtomicReference[InstrumentFactory]() private val metrics = TrieMap.empty[String, BaseMetric[_, _]] @@ -50,7 +52,7 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { lookupMetric(name, unit, InstrumentTypes.Gauge)(new GaugeMetricImpl(name, unit, instrumentFactory)) def minMaxCounter(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounterMetric = - lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory)) + lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory, registryExecutionContext)) override def snapshot(): MetricsSnapshot = synchronized { diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala index e764d7f4..274f43f8 100644 --- a/kamon-core/src/main/scala/kamon/package.scala +++ b/kamon-core/src/main/scala/kamon/package.scala @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{Executors, ThreadFactory} -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigUtil} import scala.collection.concurrent.TrieMap @@ -81,20 +81,16 @@ package object kamon { } } - implicit class UtilsOnConfig(val config: Config) extends AnyVal { import scala.collection.JavaConverters._ - def firstLevelKeys: Set[String] = { - config.entrySet().asScala.map { - case entry ⇒ entry.getKey.takeWhile(_ != '.') - } toSet - } + def topLevelKeys: Set[String] = + config.root().entrySet().asScala.map(_.getKey).toSet def configurations: Map[String, Config] = { - firstLevelKeys - .map(entry => (entry, config.getConfig(entry))) - .toMap + topLevelKeys + .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry)))) + .toMap } } } diff --git a/kamon-core/src/main/scala/kamon/util/Filters.scala b/kamon-core/src/main/scala/kamon/util/Filters.scala index 78553309..ab505ca7 100644 --- a/kamon-core/src/main/scala/kamon/util/Filters.scala +++ b/kamon-core/src/main/scala/kamon/util/Filters.scala @@ -25,7 +25,7 @@ object Filters { val filtersConfig = config.getConfig("kamon.util.filters") val acceptUnmatched = filtersConfig.getBoolean("accept-unmatched") - val perMetricFilter = filtersConfig.firstLevelKeys.filter(_ != "accept-unmatched") map { filterName: String ⇒ + val perMetricFilter = filtersConfig.topLevelKeys.filter(_ != "accept-unmatched") map { filterName: String ⇒ val includes = readFilters(filtersConfig, s"$filterName.includes") val excludes = readFilters(filtersConfig, s"$filterName.excludes") |