From b94ca0cbb931799ccd2ad2d245660c4d48032c83 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 8 Jun 2017 15:35:42 +0200 Subject: get a basic threading model for the span reporters --- .../src/main/scala/kamon/metric/Accumulator.scala | 17 +++++++ .../src/main/scala/kamon/metric/Scaler.scala | 58 ++++++++++++++++++++++ .../src/main/scala/kamon/metric/TickSnapshot.scala | 13 +++-- 3 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metric/Accumulator.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Scaler.scala (limited to 'kamon-core/src/main/scala/kamon/metric') diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala new file mode 100644 index 00000000..b87f5530 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala @@ -0,0 +1,17 @@ +package kamon.metric + +import kamon.util.MeasurementUnit + + +class DistributionAccumulator(dynamicRange: DynamicRange) { + private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator", + tags = Map.empty, measurementUnit = MeasurementUnit.none, dynamicRange) + + + def add(distribution: Distribution): Unit = { + distribution.bucketsIterator.foreach(b => accumulatorHistogram.record(b.value, b.frequency)) + } + + def result(): Distribution = + accumulatorHistogram.snapshot().distribution +} diff --git a/kamon-core/src/main/scala/kamon/metric/Scaler.scala b/kamon-core/src/main/scala/kamon/metric/Scaler.scala new file mode 100644 index 00000000..f8f51c00 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Scaler.scala @@ -0,0 +1,58 @@ +package kamon.metric + +import kamon.util.MeasurementUnit +import kamon.util.MeasurementUnit.Dimension + +class Scaler(targetTimeUnit: MeasurementUnit, targetInformationUnit: MeasurementUnit, dynamicRange: DynamicRange) { + require(targetTimeUnit.dimension == Dimension.Time, "timeUnit must be in the time dimension.") + require(targetInformationUnit.dimension == Dimension.Information, "informationUnit must be in the information dimension.") + + val scaleHistogram = new HdrHistogram("scaler", Map.empty, MeasurementUnit.none, dynamicRange) + + def scaleDistribution(metric: MetricDistribution): MetricDistribution = { + metric.measurementUnit match { + case MeasurementUnit(Dimension.Time, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricDistributionToTarget(metric, targetTimeUnit) + + case MeasurementUnit(Dimension.Information, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricDistributionToTarget(metric, targetInformationUnit) + + case _ => metric + } + } + + def scaleMetricValue(metric: MetricValue): MetricValue = { + metric.measurementUnit match { + case MeasurementUnit(Dimension.Time, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricValueToTarget(metric, targetTimeUnit) + + case MeasurementUnit(Dimension.Information, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricValueToTarget(metric, targetInformationUnit) + + case _ => metric + } + } + + private def scaleMetricDistributionToTarget(metric: MetricDistribution, targetUnit: MeasurementUnit): MetricDistribution = { + metric.distribution.bucketsIterator.foreach(b => { + val scaledValue = MeasurementUnit.scale(b.value, metric.measurementUnit, targetUnit) + scaleHistogram.record(Math.ceil(scaledValue).toLong, b.frequency) + }) + + scaleHistogram.snapshot().copy( + name = metric.name, + tags = metric.tags, + measurementUnit = targetUnit, + dynamicRange = dynamicRange + ) + } + + private def scaleMetricValueToTarget(metric: MetricValue, targetUnit: MeasurementUnit): MetricValue = { + val scaledValue = MeasurementUnit.scale(metric.value, metric.measurementUnit, targetUnit) + + metric.copy( + value = Math.ceil(scaledValue).toLong, + measurementUnit = targetUnit + ) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala index e8587ffe..6aed1ab3 100644 --- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -1,8 +1,15 @@ package kamon.metric - import kamon.util.MeasurementUnit + +/** + * + * @param interval + * @param metrics + */ +case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot) + case class Interval(from: Long, to: Long) case class MetricsSnapshot( @@ -12,10 +19,6 @@ case class MetricsSnapshot( counters: Seq[MetricValue] ) -case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot) - - - /** * Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges. * -- cgit v1.2.3