aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-12-13 00:48:14 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2017-12-13 00:48:42 +0100
commitd9cd75db039b31eec8d7271b162ea822d1d4d5e3 (patch)
tree5c7319823da604af4093a7c1cc2a9df09b718832
parentc10d93c4594b25b82fd2acc89fea18035ffcbc6f (diff)
downloadKamon-d9cd75db039b31eec8d7271b162ea822d1d4d5e3.tar.gz
Kamon-d9cd75db039b31eec8d7271b162ea822d1d4d5e3.tar.bz2
Kamon-d9cd75db039b31eec8d7271b162ea822d1d4d5e3.zip
add PeriodSnapshotAccumulator, successor of 0.6.x TickMetricSnapshotBuffer
-rw-r--r--kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala129
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Accumulator.scala141
2 files changed, 270 insertions, 0 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala b/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala
new file mode 100644
index 00000000..202fe5b9
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala
@@ -0,0 +1,129 @@
+package kamon.metric
+
+import java.time.temporal.ChronoUnit
+import java.time.Duration
+
+import kamon.Kamon
+import kamon.testkit.{MetricInspection, Reconfigure}
+import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec}
+
+class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with MetricInspection with Matchers
+ with BeforeAndAfterAll with OptionValues {
+
+ "the PeriodSnapshotAccumulator" should {
+ "allow to peek on an empty accumulator" in {
+ val accumulator = newAccumulator(10, 1)
+ val periodSnapshot = accumulator.peek()
+ periodSnapshot.metrics.histograms shouldBe empty
+ periodSnapshot.metrics.rangeSamplers shouldBe empty
+ periodSnapshot.metrics.gauges shouldBe empty
+ periodSnapshot.metrics.counters shouldBe empty
+ }
+
+ "bypass accumulation if the configured duration is equal to the metric tick-interval, regardless of the snapshot" in {
+ val accumulator = newAccumulator(10, 1)
+ accumulator.add(tenSeconds).value should be theSameInstanceAs(tenSeconds)
+ accumulator.add(fiveSecondsOne).value should be theSameInstanceAs(fiveSecondsOne)
+ }
+
+ "bypass accumulation if snapshots have a period longer than duration minus margin" in {
+ val accumulator = newAccumulator(4, 1)
+ accumulator.add(almostThreeSeconds) shouldBe empty
+ accumulator.add(threeSeconds).value should be theSameInstanceAs(threeSeconds)
+ accumulator.add(fourSeconds).value should be theSameInstanceAs(fourSeconds)
+ accumulator.add(nineSeconds).value should be theSameInstanceAs(nineSeconds)
+ }
+
+ "allow to peek into the data that has been accumulated" in {
+ val accumulator = newAccumulator(20, 1)
+ accumulator.add(fiveSecondsOne) shouldBe empty
+ accumulator.add(fiveSecondsTwo) shouldBe empty
+
+ for(_ <- 1 to 10) {
+ val peekSnapshot = accumulator.peek()
+ val mergedHistogram = peekSnapshot.metrics.histograms.head
+ val mergedRangeSampler = peekSnapshot.metrics.rangeSamplers.head
+ peekSnapshot.metrics.counters.head.value shouldBe (55)
+ peekSnapshot.metrics.gauges.head.value shouldBe (33)
+ mergedHistogram.distribution.buckets.map(_.value) should contain allOf(22L, 33L)
+ mergedRangeSampler.distribution.buckets.map(_.value) should contain allOf(22L, 33L)
+ }
+
+ accumulator.add(fiveSecondsThree) shouldBe empty
+
+ for(_ <- 1 to 10) {
+ val peekSnapshot = accumulator.peek()
+ val mergedHistogram = peekSnapshot.metrics.histograms.head
+ val mergedRangeSampler = peekSnapshot.metrics.rangeSamplers.head
+ peekSnapshot.metrics.counters.head.value shouldBe (67)
+ peekSnapshot.metrics.gauges.head.value shouldBe (12)
+ mergedHistogram.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L)
+ mergedRangeSampler.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L)
+ }
+ }
+
+ "produce a snapshot when enough data has been accumulated" in {
+ val accumulator = newAccumulator(15, 1)
+ accumulator.add(fiveSecondsOne) shouldBe empty
+ accumulator.add(fiveSecondsTwo) shouldBe empty
+
+ val snapshotOne = accumulator.add(fiveSecondsThree).value
+ snapshotOne.from shouldBe fiveSecondsOne.from
+ snapshotOne.to shouldBe fiveSecondsThree.to
+
+ val mergedHistogram = snapshotOne.metrics.histograms.head
+ val mergedRangeSampler = snapshotOne.metrics.rangeSamplers.head
+ snapshotOne.metrics.counters.head.value shouldBe(67)
+ snapshotOne.metrics.gauges.head.value shouldBe(12)
+ mergedHistogram.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L)
+ mergedRangeSampler.distribution.buckets.map(_.value) should contain allOf(22L, 33L, 12L)
+
+ val emptySnapshot = accumulator.peek()
+ emptySnapshot.metrics.histograms shouldBe empty
+ emptySnapshot.metrics.rangeSamplers shouldBe empty
+ emptySnapshot.metrics.gauges shouldBe empty
+ emptySnapshot.metrics.counters shouldBe empty
+
+ accumulator.add(fiveSecondsFour) shouldBe empty
+ }
+ }
+
+ val zeroTime = Kamon.clock().instant().truncatedTo(ChronoUnit.SECONDS)
+
+ val fiveSecondsOne = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(5), createMetricsSnapshot(22))
+ val fiveSecondsTwo = PeriodSnapshot(zeroTime.plusSeconds(5), zeroTime.plusSeconds(10), createMetricsSnapshot(33))
+ val fiveSecondsThree = PeriodSnapshot(zeroTime.plusSeconds(10), zeroTime.plusSeconds(15), createMetricsSnapshot(12))
+ val fiveSecondsFour = PeriodSnapshot(zeroTime.plusSeconds(15), zeroTime.plusSeconds(20), createMetricsSnapshot(37))
+
+ val almostThreeSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(3).minusMillis(1), createMetricsSnapshot(22))
+ val threeSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(3), createMetricsSnapshot(22))
+ val fourSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(4), createMetricsSnapshot(22))
+ val nineSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(9), createMetricsSnapshot(22))
+ val tenSeconds = PeriodSnapshot(zeroTime, zeroTime.plusSeconds(10), createMetricsSnapshot(36))
+
+
+ def newAccumulator(duration: Long, margin: Long) =
+ new PeriodSnapshotAccumulator(Duration.ofSeconds(duration), Duration.ofSeconds(margin))
+
+ def createMetricsSnapshot(value: Long) = MetricsSnapshot(
+ histograms = Seq(createDistributionSnapshot(s"histogram", Map("metric" -> "histogram"), MeasurementUnit.time.microseconds, DynamicRange.Fine)(value)),
+ rangeSamplers = Seq(createDistributionSnapshot(s"gauge", Map("metric" -> "gauge"), MeasurementUnit.time.microseconds, DynamicRange.Default)(value)),
+ gauges = Seq(createValueSnapshot(s"gauge", Map("metric" -> "gauge"), MeasurementUnit.information.bytes, value)),
+ counters = Seq(createValueSnapshot(s"counter", Map("metric" -> "counter"), MeasurementUnit.information.bytes, value))
+ )
+
+ def createValueSnapshot(metric: String, tags: Map[String, String], unit: MeasurementUnit, value: Long): MetricValue = {
+ MetricValue(metric, tags, unit, value)
+ }
+
+ def createDistributionSnapshot(metric: String, tags: Map[String, String], unit: MeasurementUnit, dynamicRange: DynamicRange)(values: Long*): MetricDistribution = {
+ val histogram = Kamon.histogram(metric, unit, dynamicRange).refine(tags)
+ values.foreach(histogram.record)
+ val distribution = histogram.distribution(resetState = true)
+ MetricDistribution(metric, tags, unit, dynamicRange, distribution)
+ }
+
+ override protected def beforeAll(): Unit = {
+ applyConfig("kamon.metric.tick-interval = 10 seconds")
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
index 9f7c6d70..79713336 100644
--- a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
@@ -15,6 +15,13 @@
package kamon.metric
+import java.time.{Duration, Instant}
+
+import kamon.{Kamon, Tags}
+import kamon.metric.PeriodSnapshotAccumulator.{MetricDistributionKey, MetricValueKey}
+
+import scala.collection.mutable
+
class DistributionAccumulator(dynamicRange: DynamicRange) {
private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator",
@@ -26,3 +33,137 @@ class DistributionAccumulator(dynamicRange: DynamicRange) {
def result(resetState: Boolean): Distribution =
accumulatorHistogram.snapshot(resetState).distribution
}
+
+/**
+ * Merges snapshots over the specified duration and produces a snapshot with all merged metrics provided to it within
+ * the period. This class is mutable, not thread safe and assumes that all snapshots passed to the `accumulate(...)`
+ * function are ordered in time.
+ *
+ * The typical use of this class would be when writing metric reporters that have to report data at a specific interval
+ * and wants to protect from users configuring a more frequent metrics tick interval. Example:
+ *
+ * {{{
+ * class Reporter extends MetricsReporter {
+ * val accumulator = new PeriodSnapshotAccumulator(Duration.ofSeconds(60), Duration.ofSeconds(1))
+ *
+ * def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
+ * accumulator.add(snapshot).foreach(accumulatedSnapshot => {
+ * // Process your snapshot here, will only be called when the expected period has passed.
+ * }
+ * }
+ *
+ * ...
+ * }
+ * }}}
+ *
+ * The margin time is used to determine how close the current accumulated interval can to be to the expected interval
+ * and still get reported. In the example above a accumulated period of 59.6 seconds has a margin to 60 seconds of
+ * 0.4 seconds, thus, getting reported immediately instead of waiting for the next snapshot.
+ *
+ * A detail of what has been accumulated by calling the `.peek()` function.
+ *
+ * @param duration for how long to accumulate snapshots
+ * @param margin error margin for expected reporting period
+ */
+class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) {
+ private val minimumDuration = duration.minus(margin)
+ private val counters = mutable.Map[MetricValueKey, Long]()
+ private val gauges = mutable.Map[MetricValueKey, Long]()
+ private val histograms = mutable.Map[MetricDistributionKey, DistributionAccumulator]()
+ private val rangeSamplers = mutable.Map[MetricDistributionKey, DistributionAccumulator]()
+
+ private var accumulatingFrom: Option[Instant] = None
+
+ def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = {
+ // short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the
+ // snapshots have a longer period than the duration).
+ if(isSameDurationAsTickInterval() || isWithinDurationMargin(periodSnapshot.from, periodSnapshot.to)) Some(periodSnapshot) else {
+ if (accumulatingFrom.isEmpty)
+ accumulatingFrom = Some(periodSnapshot.from)
+
+ periodSnapshot.metrics.counters.foreach(c => accumulateValue(counters, c))
+ periodSnapshot.metrics.gauges.foreach(g => replaceValue(gauges, g))
+ periodSnapshot.metrics.histograms.foreach(h => accumulateDistribution(histograms, h))
+ periodSnapshot.metrics.rangeSamplers.foreach(rs => accumulateDistribution(rangeSamplers, rs))
+
+ for(from <- accumulatingFrom if isWithinDurationMargin(from, periodSnapshot.to)) yield {
+ val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true)
+ accumulatingFrom = None
+ clearAccumulatedData()
+
+ accumulatedPeriodSnapshot
+ }
+ }
+ }
+
+ def peek(): PeriodSnapshot = {
+ val now = Kamon.clock().instant()
+ buildPeriodSnapshot(accumulatingFrom.getOrElse(now), now, resetState = false)
+ }
+
+ private def isWithinDurationMargin(from: Instant, to: Instant): Boolean = {
+ !Duration.between(from, to).minus(minimumDuration).isNegative
+ }
+
+ private def isSameDurationAsTickInterval(): Boolean = {
+ Kamon.config().getDuration("kamon.metric.tick-interval").equals(duration)
+ }
+
+ private def buildPeriodSnapshot(from: Instant, to: Instant, resetState: Boolean): PeriodSnapshot = {
+ val metrics = MetricsSnapshot(
+ histograms = histograms.map(createDistributionSnapshot(resetState)).toSeq,
+ rangeSamplers = rangeSamplers.map(createDistributionSnapshot(resetState)).toSeq,
+ gauges = gauges.map(createValueSnapshot(resetState)).toSeq,
+ counters = counters.map(createValueSnapshot(resetState)).toSeq
+ )
+
+ PeriodSnapshot(from, to, metrics)
+ }
+
+ private def accumulateValue(cache: mutable.Map[MetricValueKey, Long], metric: MetricValue): Unit = {
+ val key = MetricValueKey(metric.name, metric.tags, metric.unit)
+ cache.get(key).map(previousValue => {
+ cache.put(key, metric.value + previousValue)
+ }).orElse {
+ cache.put(key, metric.value)
+ }
+ }
+
+ private def replaceValue(cache: mutable.Map[MetricValueKey, Long], metric: MetricValue): Unit = {
+ val key = MetricValueKey(metric.name, metric.tags, metric.unit)
+ cache.put(key, metric.value)
+ }
+
+ private def createValueSnapshot(reset: Boolean)(pair: (MetricValueKey, Long)): MetricValue = {
+ val (key, value) = pair
+ MetricValue(key.name, key.tags, key.unit, value)
+ }
+
+ private def createDistributionSnapshot(resetState: Boolean)(pair: (MetricDistributionKey, DistributionAccumulator)): MetricDistribution = {
+ val (key, value) = pair
+ MetricDistribution(key.name, key.tags, key.unit, key.dynamicRange, value.result(resetState))
+ }
+
+ private def accumulateDistribution(cache: mutable.Map[MetricDistributionKey, DistributionAccumulator], metric: MetricDistribution): Unit = {
+ val key = MetricDistributionKey(metric.name, metric.tags, metric.unit, metric.dynamicRange)
+ cache.get(key).map(previousValue => {
+ previousValue.add(metric.distribution)
+ }).orElse {
+ val distributionAccumulator = new DistributionAccumulator(key.dynamicRange)
+ distributionAccumulator.add(metric.distribution)
+ cache.put(key, distributionAccumulator)
+ }
+ }
+
+ private def clearAccumulatedData(): Unit = {
+ histograms.clear()
+ rangeSamplers.clear()
+ counters.clear()
+ gauges.clear()
+ }
+}
+
+object PeriodSnapshotAccumulator {
+ case class MetricValueKey(name: String, tags: Tags, unit: MeasurementUnit)
+ case class MetricDistributionKey(name: String, tags: Tags, unit: MeasurementUnit, dynamicRange: DynamicRange)
+}