diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-12-18 12:55:16 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-12-18 13:27:37 +0100 |
commit | c3964e95c09914a6a1b4039e10e630074744f4e9 (patch) | |
tree | e08f659f0c2a5295ebd4833312a3ff552852300f /kamon-core/src/main | |
parent | a97c7dac0748732700d3a98ee44bd2fdf847ffbc (diff) | |
download | Kamon-c3964e95c09914a6a1b4039e10e630074744f4e9.tar.gz Kamon-c3964e95c09914a6a1b4039e10e630074744f4e9.tar.bz2 Kamon-c3964e95c09914a6a1b4039e10e630074744f4e9.zip |
try to align accumulated ticks when possible
Diffstat (limited to 'kamon-core/src/main')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/Accumulator.scala | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala index 79713336..bf412980 100644 --- a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala +++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala @@ -19,6 +19,7 @@ import java.time.{Duration, Instant} import kamon.{Kamon, Tags} import kamon.metric.PeriodSnapshotAccumulator.{MetricDistributionKey, MetricValueKey} +import kamon.util.Clock import scala.collection.mutable @@ -66,18 +67,22 @@ class DistributionAccumulator(dynamicRange: DynamicRange) { * @param margin error margin for expected reporting period */ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { - private val minimumDuration = duration.minus(margin) private val counters = mutable.Map[MetricValueKey, Long]() private val gauges = mutable.Map[MetricValueKey, Long]() private val histograms = mutable.Map[MetricDistributionKey, DistributionAccumulator]() private val rangeSamplers = mutable.Map[MetricDistributionKey, DistributionAccumulator]() + private var nextTick: Instant = Instant.EPOCH private var accumulatingFrom: Option[Instant] = None def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = { + // Initialize the next tick based on incoming snapshots. + if(nextTick == Instant.EPOCH) + nextTick = Clock.nextTick(periodSnapshot.to, duration) + // short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the // snapshots have a longer period than the duration). - if(isSameDurationAsTickInterval() || isWithinDurationMargin(periodSnapshot.from, periodSnapshot.to)) Some(periodSnapshot) else { + if(isSameDurationAsTickInterval() || (isAroundNextTick(periodSnapshot.to) && accumulatingFrom.isEmpty)) Some(periodSnapshot) else { if (accumulatingFrom.isEmpty) accumulatingFrom = Some(periodSnapshot.from) @@ -86,8 +91,9 @@ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { periodSnapshot.metrics.histograms.foreach(h => accumulateDistribution(histograms, h)) periodSnapshot.metrics.rangeSamplers.foreach(rs => accumulateDistribution(rangeSamplers, rs)) - for(from <- accumulatingFrom if isWithinDurationMargin(from, periodSnapshot.to)) yield { + for(from <- accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield { val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true) + nextTick = Clock.nextTick(nextTick, duration) accumulatingFrom = None clearAccumulatedData() @@ -97,12 +103,11 @@ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) { } def peek(): PeriodSnapshot = { - val now = Kamon.clock().instant() - buildPeriodSnapshot(accumulatingFrom.getOrElse(now), now, resetState = false) + buildPeriodSnapshot(accumulatingFrom.getOrElse(nextTick), nextTick, resetState = false) } - private def isWithinDurationMargin(from: Instant, to: Instant): Boolean = { - !Duration.between(from, to).minus(minimumDuration).isNegative + private def isAroundNextTick(instant: Instant): Boolean = { + Duration.between(instant, nextTick.minus(margin)).toMillis() <= 0 } private def isSameDurationAsTickInterval(): Boolean = { |