From d9cd75db039b31eec8d7271b162ea822d1d4d5e3 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 13 Dec 2017 00:48:14 +0100 Subject: add PeriodSnapshotAccumulator, successor of 0.6.x TickMetricSnapshotBuffer --- .../metric/PeriodSnapshotAccumulatorSpec.scala | 129 +++++++++++++++++++ .../src/main/scala/kamon/metric/Accumulator.scala | 141 +++++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala diff --git a/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala b/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala new file mode 100644 index 00000000..202fe5b9 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala @@ -0,0 +1,129 @@ +package kamon.metric + +import java.time.temporal.ChronoUnit +import java.time.Duration + +import kamon.Kamon +import kamon.testkit.{MetricInspection, Reconfigure} +import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} + +class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with MetricInspection with Matchers + with BeforeAndAfterAll with OptionValues { + + "the PeriodSnapshotAccumulator" should { + "allow to peek on an empty accumulator" in { + val accumulator = newAccumulator(10, 1) + val periodSnapshot = accumulator.peek() + periodSnapshot.metrics.histograms shouldBe empty + periodSnapshot.metrics.rangeSamplers shouldBe empty + periodSnapshot.metrics.gauges shouldBe empty + periodSnapshot.metrics.counters shouldBe empty + } + + "bypass accumulation if the configured duration is equal to the metric tick-interval, regardless of the snapshot" in { + val accumulator = newAccumulator(10, 1) + accumulator.add(tenSeconds).value should be theSameInstanceAs(tenSeconds) + accumulator.add(fiveSecondsOne).value should be theSameInstanceAs(fiveSecondsOne) + } + + "bypass accumulation if snapshots have a period longer than duration minus margin" in { + val accumulator = newAccumulator(4, 1) + accumulator.add(almostThreeSeconds) shouldBe empty + accumulator.add(threeSeconds).value should be theSameInstanceAs(threeSeconds) + accumulator.add(fourSeconds).value should be theSameInstanceAs(fourSeconds) + accumulator.add(nineSeconds).value should be theSameInstanceAs(nineSeconds) + } + + "allow to peek into the data that has been accumulated" in { + val accumulator = newAccumulator(20, 1) + accumulator.add(fiveSecondsOne) shouldBe empty + accumulator.add(fiveSecondsTwo) shouldBe empty + + for(_ <- 1 to 10) { + val peekSnapshot = accumulator.peek() + val mergedHistogram = peekSnapshot.metrics.histograms.head + val mergedRangeSampler = peekSnapshot.metrics.rangeSamplers.head + peekSnapshot.metrics.counters.head.value shouldBe (55) + peekSnapshot.metrics.gauges.head.value shouldBe (33) + mergedHistogram.distribution.buckets.map(_.value) should contain allOf(22L, 33L) + mergedRangeSampler.distribution.buckets.map(_.value) should contain allOf(22L, 33L) + } + + accumulator.add(fiveSecondsThree) shouldBe empty + + for(_ <- 1 to 10) { + val peekSnapshot = accumulator.peek() + val mergedHistogram = peekSnapshot.metrics.histograms.head + val mergedRangeSampler = peekSnapshot.metrics.rangeSamplers.head + peekSnapshot.metrics.counters.head.value shouldBe (67) + peekSnapshot.metrics.gauges.head.value shouldBe (12) + mergedHistogram.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L) + mergedRangeSampler.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L) + } + } + + "produce a snapshot when enough data has been accumulated" in { + val accumulator = newAccumulator(15, 1) + accumulator.add(fiveSecondsOne) shouldBe empty + accumulator.add(fiveSecondsTwo) shouldBe empty + + val snapshotOne = accumulator.add(fiveSecondsThree).value + snapshotOne.from shouldBe fiveSecondsOne.from + snapshotOne.to shouldBe fiveSecondsThree.to + + val mergedHistogram = snapshotOne.metrics.histograms.head + val mergedRangeSampler = snapshotOne.metrics.rangeSamplers.head + snapshotOne.metrics.counters.head.value shouldBe(67) + snapshotOne.metrics.gauges.head.value shouldBe(12) + mergedHistogram.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L) + mergedRangeSampler.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L) + + val emptySnapshot = accumulator.peek() + emptySnapshot.metrics.histograms shouldBe empty + emptySnapshot.metrics.rangeSamplers shouldBe empty + emptySnapshot.metrics.gauges shouldBe empty + emptySnapshot.metrics.counters shouldBe empty + + accumulator.add(fiveSecondsFour) shouldBe empty + } + } + + val zeroTime = Kamon.clock().instant().truncatedTo(ChronoUnit.SECONDS) + + val fiveSecondsOne = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(5), createMetricsSnapshot(22)) + val fiveSecondsTwo = PeriodSnapshot(zeroTime.plusSeconds(5), zeroTime.plusSeconds(10), createMetricsSnapshot(33)) + val fiveSecondsThree = PeriodSnapshot(zeroTime.plusSeconds(10), zeroTime.plusSeconds(15), createMetricsSnapshot(12)) + val fiveSecondsFour = PeriodSnapshot(zeroTime.plusSeconds(15), zeroTime.plusSeconds(20), createMetricsSnapshot(37)) + + val almostThreeSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(3).minusMillis(1), createMetricsSnapshot(22)) + val threeSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(3), createMetricsSnapshot(22)) + val fourSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(4), createMetricsSnapshot(22)) + val nineSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(9), createMetricsSnapshot(22)) + val tenSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(10), createMetricsSnapshot(36)) + + + def newAccumulator(duration: Long, margin: Long) = + new PeriodSnapshotAccumulator(Duration.ofSeconds(duration), Duration.ofSeconds(margin)) + + def createMetricsSnapshot(value: Long) = MetricsSnapshot( + histograms = Seq(createDistributionSnapshot(s"histogram", Map("metric" -> "histogram"), MeasurementUnit.time.microseconds, DynamicRange.Fine)(value)), + rangeSamplers = Seq(createDistributionSnapshot(s"gauge", Map("metric" -> "gauge"), MeasurementUnit.time.microseconds, DynamicRange.Default)(value)), + gauges = Seq(createValueSnapshot(s"gauge", Map("metric" -> "gauge"), MeasurementUnit.information.bytes, value)), + counters = Seq(createValueSnapshot(s"counter", Map("metric" -> "counter"), MeasurementUnit.information.bytes, value)) + ) + + def createValueSnapshot(metric: String, tags: Map[String, String], unit: MeasurementUnit, value: Long): MetricValue = { + MetricValue(metric, tags, unit, value) + } + + def createDistributionSnapshot(metric: String, tags: Map[String, String], unit: MeasurementUnit, dynamicRange: DynamicRange)(values: Long*): MetricDistribution = { + val histogram = Kamon.histogram(metric, unit, dynamicRange).refine(tags) + values.foreach(histogram.record) + val distribution = histogram.distribution(resetState = true) + MetricDistribution(metric, tags, unit, dynamicRange, distribution) + } + + override protected def beforeAll(): Unit = { + applyConfig("kamon.metric.tick-interval = 10 seconds") + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala index 9f7c6d70..79713336 100644 --- a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala +++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala @@ -15,6 +15,13 @@ package kamon.metric +import java.time.{Duration, Instant} + +import kamon.{Kamon, Tags} +import kamon.metric.PeriodSnapshotAccumulator.{MetricDistributionKey, MetricValueKey} + +import scala.collection.mutable + class DistributionAccumulator(dynamicRange: DynamicRange) { private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator", @@ -26,3 +33,137 @@ class DistributionAccumulator(dynamicRange: DynamicRange) { def result(resetState: Boolean): Distribution = accumulatorHistogram.snapshot(resetState).distribution } + +/** + * Merges snapshots over the specified duration and produces a snapshot with all merged metrics provided to it within + * the period. This class is mutable, not thread safe and assumes that all snapshots passed to the `accumulate(...)` + * function are ordered in time. + * + * The typical use of this class would be when writing metric reporters that have to report data at a specific interval + * and wants to protect from users configuring a more frequent metrics tick interval. Example: + * + * {{{ + * class Reporter extends MetricsReporter { + * val accumulator = new PeriodSnapshotAccumulator(Duration.ofSeconds(60), Duration.ofSeconds(1)) + * + * def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { + * accumulator.add(snapshot).foreach(accumulatedSnapshot => { + * // Process your snapshot here, will only be called when the expected period has passed. + * } + * } + * + * ... + * } + * }}} + * + * The margin time is used to determine how close the current accumulated interval can to be to the expected interval + * and still get reported. In the example above a accumulated period of 59.6 seconds has a margin to 60 seconds of + * 0.4 seconds, thus, getting reported immediately instead of waiting for the next snapshot. + * + * A detail of what has been accumulated by calling the `.peek()` function. + * + * @param duration for how long to accumulate snapshots + * @param margin error margin for expected reporting period + */ +class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { + private val minimumDuration = duration.minus(margin) + private val counters = mutable.Map[MetricValueKey, Long]() + private val gauges = mutable.Map[MetricValueKey, Long]() + private val histograms = mutable.Map[MetricDistributionKey, DistributionAccumulator]() + private val rangeSamplers = mutable.Map[MetricDistributionKey, DistributionAccumulator]() + + private var accumulatingFrom: Option[Instant] = None + + def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = { + // short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the + // snapshots have a longer period than the duration). + if(isSameDurationAsTickInterval() || isWithinDurationMargin(periodSnapshot.from, periodSnapshot.to)) Some(periodSnapshot) else { + if (accumulatingFrom.isEmpty) + accumulatingFrom = Some(periodSnapshot.from) + + periodSnapshot.metrics.counters.foreach(c => accumulateValue(counters, c)) + periodSnapshot.metrics.gauges.foreach(g => replaceValue(gauges, g)) + periodSnapshot.metrics.histograms.foreach(h => accumulateDistribution(histograms, h)) + periodSnapshot.metrics.rangeSamplers.foreach(rs => accumulateDistribution(rangeSamplers, rs)) + + for(from <- accumulatingFrom if isWithinDurationMargin(from, periodSnapshot.to)) yield { + val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true) + accumulatingFrom = None + clearAccumulatedData() + + accumulatedPeriodSnapshot + } + } + } + + def peek(): PeriodSnapshot = { + val now = Kamon.clock().instant() + buildPeriodSnapshot(accumulatingFrom.getOrElse(now), now, resetState = false) + } + + private def isWithinDurationMargin(from: Instant, to: Instant): Boolean = { + !Duration.between(from, to).minus(minimumDuration).isNegative + } + + private def isSameDurationAsTickInterval(): Boolean = { + Kamon.config().getDuration("kamon.metric.tick-interval").equals(duration) + } + + private def buildPeriodSnapshot(from: Instant, to: Instant, resetState: Boolean): PeriodSnapshot = { + val metrics = MetricsSnapshot( + histograms = histograms.map(createDistributionSnapshot(resetState)).toSeq, + rangeSamplers = rangeSamplers.map(createDistributionSnapshot(resetState)).toSeq, + gauges = gauges.map(createValueSnapshot(resetState)).toSeq, + counters = counters.map(createValueSnapshot(resetState)).toSeq + ) + + PeriodSnapshot(from, to, metrics) + } + + private def accumulateValue(cache: mutable.Map[MetricValueKey, Long], metric: MetricValue): Unit = { + val key = MetricValueKey(metric.name, metric.tags, metric.unit) + cache.get(key).map(previousValue => { + cache.put(key, metric.value + previousValue) + }).orElse { + cache.put(key, metric.value) + } + } + + private def replaceValue(cache: mutable.Map[MetricValueKey, Long], metric: MetricValue): Unit = { + val key = MetricValueKey(metric.name, metric.tags, metric.unit) + cache.put(key, metric.value) + } + + private def createValueSnapshot(reset: Boolean)(pair: (MetricValueKey, Long)): MetricValue = { + val (key, value) = pair + MetricValue(key.name, key.tags, key.unit, value) + } + + private def createDistributionSnapshot(resetState: Boolean)(pair: (MetricDistributionKey, DistributionAccumulator)): MetricDistribution = { + val (key, value) = pair + MetricDistribution(key.name, key.tags, key.unit, key.dynamicRange, value.result(resetState)) + } + + private def accumulateDistribution(cache: mutable.Map[MetricDistributionKey, DistributionAccumulator], metric: MetricDistribution): Unit = { + val key = MetricDistributionKey(metric.name, metric.tags, metric.unit, metric.dynamicRange) + cache.get(key).map(previousValue => { + previousValue.add(metric.distribution) + }).orElse { + val distributionAccumulator = new DistributionAccumulator(key.dynamicRange) + distributionAccumulator.add(metric.distribution) + cache.put(key, distributionAccumulator) + } + } + + private def clearAccumulatedData(): Unit = { + histograms.clear() + rangeSamplers.clear() + counters.clear() + gauges.clear() + } +} + +object PeriodSnapshotAccumulator { + case class MetricValueKey(name: String, tags: Tags, unit: MeasurementUnit) + case class MetricDistributionKey(name: String, tags: Tags, unit: MeasurementUnit, dynamicRange: DynamicRange) +} -- cgit v1.2.3