/* ========================================================================================= * Copyright © 2013-2017 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. * ========================================================================================= */ package kamon.metric import java.time.{Duration, Instant} import kamon.{Kamon, Tags} import kamon.metric.PeriodSnapshotAccumulator.{MetricDistributionKey, MetricValueKey} import kamon.util.Clock import scala.collection.mutable class DistributionAccumulator(dynamicRange: DynamicRange) { private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator", tags = Map.empty, unit = MeasurementUnit.none, dynamicRange) def add(distribution: Distribution): Unit = distribution.bucketsIterator.foreach(b => accumulatorHistogram.record(b.value, b.frequency)) def result(resetState: Boolean): Distribution = accumulatorHistogram.snapshot(resetState).distribution } /** * Merges snapshots over the specified duration and produces a snapshot with all merged metrics provided to it within * the period. This class is mutable, not thread safe and assumes that all snapshots passed to the `accumulate(...)` * function are ordered in time. * * The typical use of this class would be when writing metric reporters that have to report data at a specific interval * and wants to protect from users configuring a more frequent metrics tick interval. Example: * * {{{ * class Reporter extends MetricsReporter { * val accumulator = new PeriodSnapshotAccumulator(Duration.ofSeconds(60), Duration.ofSeconds(1)) * * def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { * accumulator.add(snapshot).foreach(accumulatedSnapshot => { * // Process your snapshot here, will only be called when the expected period has passed. * } * } * * ... * } * }}} * * The margin time is used to determine how close the current accumulated interval can to be to the expected interval * and still get reported. In the example above a accumulated period of 59.6 seconds has a margin to 60 seconds of * 0.4 seconds, thus, getting reported immediately instead of waiting for the next snapshot. * * A detail of what has been accumulated by calling the `.peek()` function. * * @param duration for how long to accumulate snapshots * @param margin error margin for expected reporting period */ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { private val counters = mutable.Map[MetricValueKey, Long]() private val gauges = mutable.Map[MetricValueKey, Long]() private val histograms = mutable.Map[MetricDistributionKey, DistributionAccumulator]() private val rangeSamplers = mutable.Map[MetricDistributionKey, DistributionAccumulator]() private var nextTick: Instant = Instant.EPOCH private var accumulatingFrom: Option[Instant] = None def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = { // Initialize the next tick based on incoming snapshots. if(nextTick == Instant.EPOCH) nextTick = Clock.nextTick(periodSnapshot.to, duration) // short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the // snapshots have a longer period than the duration). if(isSameDurationAsTickInterval() || (isAroundNextTick(periodSnapshot.to) && accumulatingFrom.isEmpty)) Some(periodSnapshot) else { if (accumulatingFrom.isEmpty) accumulatingFrom = Some(periodSnapshot.from) periodSnapshot.metrics.counters.foreach(c => accumulateValue(counters, c)) periodSnapshot.metrics.gauges.foreach(g => replaceValue(gauges, g)) periodSnapshot.metrics.histograms.foreach(h => accumulateDistribution(histograms, h)) periodSnapshot.metrics.rangeSamplers.foreach(rs => accumulateDistribution(rangeSamplers, rs)) for(from <- accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield { val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true) nextTick = Clock.nextTick(nextTick, duration) accumulatingFrom = None clearAccumulatedData() accumulatedPeriodSnapshot } } } def peek(): PeriodSnapshot = { buildPeriodSnapshot(accumulatingFrom.getOrElse(nextTick), nextTick, resetState = false) } private def isAroundNextTick(instant: Instant): Boolean = { Duration.between(instant, nextTick.minus(margin)).toMillis() <= 0 } private def isSameDurationAsTickInterval(): Boolean = { Kamon.config().getDuration("kamon.metric.tick-interval").equals(duration) } private def buildPeriodSnapshot(from: Instant, to: Instant, resetState: Boolean): PeriodSnapshot = { val metrics = MetricsSnapshot( histograms = histograms.map(createDistributionSnapshot(resetState)).toSeq, rangeSamplers = rangeSamplers.map(createDistributionSnapshot(resetState)).toSeq, gauges = gauges.map(createValueSnapshot(resetState)).toSeq, counters = counters.map(createValueSnapshot(resetState)).toSeq ) PeriodSnapshot(from, to, metrics) } private def accumulateValue(cache: mutable.Map[MetricValueKey, Long], metric: MetricValue): Unit = { val key = MetricValueKey(metric.name, metric.tags, metric.unit) cache.get(key).map(previousValue => { cache.put(key, metric.value + previousValue) }).orElse { cache.put(key, metric.value) } } private def replaceValue(cache: mutable.Map[MetricValueKey, Long], metric: MetricValue): Unit = { val key = MetricValueKey(metric.name, metric.tags, metric.unit) cache.put(key, metric.value) } private def createValueSnapshot(reset: Boolean)(pair: (MetricValueKey, Long)): MetricValue = { val (key, value) = pair MetricValue(key.name, key.tags, key.unit, value) } private def createDistributionSnapshot(resetState: Boolean)(pair: (MetricDistributionKey, DistributionAccumulator)): MetricDistribution = { val (key, value) = pair MetricDistribution(key.name, key.tags, key.unit, key.dynamicRange, value.result(resetState)) } private def accumulateDistribution(cache: mutable.Map[MetricDistributionKey, DistributionAccumulator], metric: MetricDistribution): Unit = { val key = MetricDistributionKey(metric.name, metric.tags, metric.unit, metric.dynamicRange) cache.get(key).map(previousValue => { previousValue.add(metric.distribution) }).orElse { val distributionAccumulator = new DistributionAccumulator(key.dynamicRange) distributionAccumulator.add(metric.distribution) cache.put(key, distributionAccumulator) } } private def clearAccumulatedData(): Unit = { histograms.clear() rangeSamplers.clear() counters.clear() gauges.clear() } } object PeriodSnapshotAccumulator { case class MetricValueKey(name: String, tags: Tags, unit: MeasurementUnit) case class MetricDistributionKey(name: String, tags: Tags, unit: MeasurementUnit, dynamicRange: DynamicRange) }