From c3964e95c09914a6a1b4039e10e630074744f4e9 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 18 Dec 2017 12:55:16 +0100 Subject: try to align accumulated ticks when possible --- .../src/main/scala/kamon/metric/Accumulator.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala index 79713336..bf412980 100644 --- a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala +++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala @@ -19,6 +19,7 @@ import java.time.{Duration, Instant} import kamon.{Kamon, Tags} import kamon.metric.PeriodSnapshotAccumulator.{MetricDistributionKey, MetricValueKey} +import kamon.util.Clock import scala.collection.mutable @@ -66,18 +67,22 @@ class DistributionAccumulator(dynamicRange: DynamicRange) { * @param margin error margin for expected reporting period */ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { - private val minimumDuration = duration.minus(margin) private val counters = mutable.Map[MetricValueKey, Long]() private val gauges = mutable.Map[MetricValueKey, Long]() private val histograms = mutable.Map[MetricDistributionKey, DistributionAccumulator]() private val rangeSamplers = mutable.Map[MetricDistributionKey, DistributionAccumulator]() + private var nextTick: Instant = Instant.EPOCH private var accumulatingFrom: Option[Instant] = None def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = { + // Initialize the next tick based on incoming snapshots. + if(nextTick == Instant.EPOCH) + nextTick = Clock.nextTick(periodSnapshot.to, duration) + // short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the // snapshots have a longer period than the duration). - if(isSameDurationAsTickInterval() || isWithinDurationMargin(periodSnapshot.from, periodSnapshot.to)) Some(periodSnapshot) else { + if(isSameDurationAsTickInterval() || (isAroundNextTick(periodSnapshot.to) && accumulatingFrom.isEmpty)) Some(periodSnapshot) else { if (accumulatingFrom.isEmpty) accumulatingFrom = Some(periodSnapshot.from) @@ -86,8 +91,9 @@ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { periodSnapshot.metrics.histograms.foreach(h => accumulateDistribution(histograms, h)) periodSnapshot.metrics.rangeSamplers.foreach(rs => accumulateDistribution(rangeSamplers, rs)) - for(from <- accumulatingFrom if isWithinDurationMargin(from, periodSnapshot.to)) yield { + for(from <- accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield { val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true) + nextTick = Clock.nextTick(nextTick, duration) accumulatingFrom = None clearAccumulatedData() @@ -97,12 +103,11 @@ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { } def peek(): PeriodSnapshot = { - val now = Kamon.clock().instant() - buildPeriodSnapshot(accumulatingFrom.getOrElse(now), now, resetState = false) + buildPeriodSnapshot(accumulatingFrom.getOrElse(nextTick), nextTick, resetState = false) } - private def isWithinDurationMargin(from: Instant, to: Instant): Boolean = { - !Duration.between(from, to).minus(minimumDuration).isNegative + private def isAroundNextTick(instant: Instant): Boolean = { + Duration.between(instant, nextTick.minus(margin)).toMillis() <= 0 } private def isSameDurationAsTickInterval(): Boolean = { -- cgit v1.2.3