aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-12-18 12:55:16 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2017-12-18 13:27:37 +0100
commitc3964e95c09914a6a1b4039e10e630074744f4e9 (patch)
treee08f659f0c2a5295ebd4833312a3ff552852300f /kamon-core/src/main/scala/kamon/metric
parenta97c7dac0748732700d3a98ee44bd2fdf847ffbc (diff)
downloadKamon-c3964e95c09914a6a1b4039e10e630074744f4e9.tar.gz
Kamon-c3964e95c09914a6a1b4039e10e630074744f4e9.tar.bz2
Kamon-c3964e95c09914a6a1b4039e10e630074744f4e9.zip
try to align accumulated ticks when possible
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/Accumulator.scala19
1 files changed, 12 insertions, 7 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
index 79713336..bf412980 100644
--- a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
+++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala
@@ -19,6 +19,7 @@ import java.time.{Duration, Instant}
import kamon.{Kamon, Tags}
import kamon.metric.PeriodSnapshotAccumulator.{MetricDistributionKey, MetricValueKey}
+import kamon.util.Clock
import scala.collection.mutable
@@ -66,18 +67,22 @@ class DistributionAccumulator(dynamicRange: DynamicRange) {
* @param margin error margin for expected reporting period
*/
class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) {
- private val minimumDuration = duration.minus(margin)
private val counters = mutable.Map[MetricValueKey, Long]()
private val gauges = mutable.Map[MetricValueKey, Long]()
private val histograms = mutable.Map[MetricDistributionKey, DistributionAccumulator]()
private val rangeSamplers = mutable.Map[MetricDistributionKey, DistributionAccumulator]()
+ private var nextTick: Instant = Instant.EPOCH
private var accumulatingFrom: Option[Instant] = None
def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = {
+ // Initialize the next tick based on incoming snapshots.
+ if(nextTick == Instant.EPOCH)
+ nextTick = Clock.nextTick(periodSnapshot.to, duration)
+
// short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the
// snapshots have a longer period than the duration).
- if(isSameDurationAsTickInterval() || isWithinDurationMargin(periodSnapshot.from, periodSnapshot.to)) Some(periodSnapshot) else {
+ if(isSameDurationAsTickInterval() || (isAroundNextTick(periodSnapshot.to) && accumulatingFrom.isEmpty)) Some(periodSnapshot) else {
if (accumulatingFrom.isEmpty)
accumulatingFrom = Some(periodSnapshot.from)
@@ -86,8 +91,9 @@ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) {
periodSnapshot.metrics.histograms.foreach(h => accumulateDistribution(histograms, h))
periodSnapshot.metrics.rangeSamplers.foreach(rs => accumulateDistribution(rangeSamplers, rs))
- for(from <- accumulatingFrom if isWithinDurationMargin(from, periodSnapshot.to)) yield {
+ for(from <- accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield {
val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true)
+ nextTick = Clock.nextTick(nextTick, duration)
accumulatingFrom = None
clearAccumulatedData()
@@ -97,12 +103,11 @@ class PeriodSnapshotAccumulator(duration: Duration, margin: Duration) {
}
def peek(): PeriodSnapshot = {
- val now = Kamon.clock().instant()
- buildPeriodSnapshot(accumulatingFrom.getOrElse(now), now, resetState = false)
+ buildPeriodSnapshot(accumulatingFrom.getOrElse(nextTick), nextTick, resetState = false)
}
- private def isWithinDurationMargin(from: Instant, to: Instant): Boolean = {
- !Duration.between(from, to).minus(minimumDuration).isNegative
+ private def isAroundNextTick(instant: Instant): Boolean = {
+ Duration.between(instant, nextTick.minus(margin)).toMillis() <= 0
}
private def isSameDurationAsTickInterval(): Boolean = {