From 942562d452bc1aa64ea6787702c47286c597a186 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 11 Jun 2017 13:24:44 +0200 Subject: add internal snapshot API that allows keeping state intact --- .../src/main/scala/kamon/metric/Accumulator.scala | 8 ++- .../src/main/scala/kamon/metric/Counter.scala | 17 +++++-- kamon-core/src/main/scala/kamon/metric/Gauge.scala | 5 +- .../src/main/scala/kamon/metric/Histogram.scala | 59 ++++++++++++++++------ .../scala/kamon/metric/HistogramExtension.scala | 51 ++++++++++++++++--- .../scala/kamon/metric/InstrumentFactory.scala | 8 +-- .../src/main/scala/kamon/metric/Metric.scala | 8 +-- .../main/scala/kamon/metric/MinMaxCounter.scala | 8 +-- .../src/main/scala/kamon/metric/Scaler.scala | 4 +- .../src/main/scala/kamon/metric/TickSnapshot.scala | 26 +++++----- 10 files changed, 132 insertions(+), 62 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/metric') diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala index d960565f..9017c14e 100644 --- a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala +++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala @@ -22,11 +22,9 @@ class DistributionAccumulator(dynamicRange: DynamicRange) { private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator", tags = Map.empty, unit = MeasurementUnit.none, dynamicRange) - - def add(distribution: Distribution): Unit = { + def add(distribution: Distribution): Unit = distribution.bucketsIterator.foreach(b => accumulatorHistogram.record(b.value, b.frequency)) - } - def result(): Distribution = - accumulatorHistogram.snapshot().distribution + def result(resetState: Boolean): Distribution = + accumulatorHistogram.snapshot(resetState).distribution } diff --git a/kamon-core/src/main/scala/kamon/metric/Counter.scala b/kamon-core/src/main/scala/kamon/metric/Counter.scala index e6585021..d2481fa5 100644 --- a/kamon-core/src/main/scala/kamon/metric/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/Counter.scala @@ -15,9 +15,9 @@ package kamon.metric -import com.typesafe.scalalogging.StrictLogging import kamon.jsr166.LongAdder import kamon.util.MeasurementUnit +import org.slf4j.LoggerFactory trait Counter { def unit: MeasurementUnit @@ -26,8 +26,8 @@ trait Counter { def increment(times: Long): Unit } -class LongAdderCounter(name: String, tags: Map[String, String], val unit: MeasurementUnit) - extends SnapshotableCounter with StrictLogging { +class LongAdderCounter(name: String, tags: Map[String, String], val unit: MeasurementUnit) extends Counter { + import LongAdderCounter.logger private val adder = new LongAdder() @@ -39,5 +39,14 @@ class LongAdderCounter(name: String, tags: Map[String, String], val unit: Measur else logger.warn(s"Ignored attempt to decrement counter [$name]") } - def snapshot(): MetricValue = MetricValue(name, tags, unit, adder.sumAndReset()) + def snapshot(resetState: Boolean): MetricValue = + if(resetState) + MetricValue(name, tags, unit, adder.sumAndReset()) + else + MetricValue(name, tags, unit, adder.sum()) } + +object LongAdderCounter { + private val logger = LoggerFactory.getLogger(classOf[LongAdderCounter]) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/Gauge.scala index 6797dbfd..f109b885 100644 --- a/kamon-core/src/main/scala/kamon/metric/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/Gauge.scala @@ -30,9 +30,7 @@ trait Gauge { } -class AtomicLongGauge(name: String, tags: Map[String, String], val unit: MeasurementUnit) - extends SnapshotableGauge { - +class AtomicLongGauge(name: String, tags: Map[String, String], val unit: MeasurementUnit) extends Gauge { private val currentValue = new AtomicLong(0L) def increment(): Unit = @@ -50,6 +48,7 @@ class AtomicLongGauge(name: String, tags: Map[String, String], val unit: Measure def set(value: Long): Unit = currentValue.set(value) + // Gauges never reset internal state, the resetSate parameter is always ignored. def snapshot(): MetricValue = MetricValue(name, tags, unit, currentValue.get()) } diff --git a/kamon-core/src/main/scala/kamon/metric/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/Histogram.scala index 1af55479..94172ff5 100644 --- a/kamon-core/src/main/scala/kamon/metric/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/Histogram.scala @@ -19,7 +19,7 @@ package metric import java.nio.ByteBuffer import kamon.util.MeasurementUnit -import org.HdrHistogram.{AtomicHistogramExtension, ZigZag} +import org.HdrHistogram.{AtomicHistogramExtension, HdrHistogramOps, SimpleHistogramExtension, ZigZag} import org.slf4j.LoggerFactory trait Histogram { @@ -31,8 +31,8 @@ trait Histogram { } -private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val unit: MeasurementUnit, val dynamicRange: DynamicRange) - extends AtomicHistogramExtension(dynamicRange) with Histogram { +private[kamon] class AtomicHdrHistogram(name: String, tags: Map[String, String], val unit: MeasurementUnit, val dynamicRange: DynamicRange) + extends AtomicHistogramExtension(dynamicRange) with Histogram with SnapshotCreation { def record(value: Long): Unit = tryRecord(value, 1) @@ -40,20 +40,44 @@ private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val u def record(value: Long, count: Long): Unit = tryRecord(value, count) + private[kamon] def snapshot(resetState: Boolean): MetricDistribution = + snapshot(resetState, name, tags) + private def tryRecord(value: Long, count: Long): Unit = { try { recordValueWithCount(value, count) } catch { case anyException: Throwable ⇒ - HdrHistogram.logger.warn(s"Failed to store value [$value] in histogram [$name]. You might need to change " + - "your dynamic range configuration for this instrument.", anyException) + AtomicHdrHistogram.logger.warn( + s"Failed to record value [$value] in histogram [$name]. You might need to change your dynamic range " + + s"configuration for this instrument.", anyException) } } +} + +private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val unit: MeasurementUnit, val dynamicRange: DynamicRange) + extends SimpleHistogramExtension(dynamicRange) with Histogram with SnapshotCreation { + + def record(value: Long): Unit = + tryRecord(value, 1) + + def record(value: Long, count: Long): Unit = + tryRecord(value, count) + + private[kamon] def snapshot(resetState: Boolean): MetricDistribution = + snapshot(resetState, name, tags) + + private def tryRecord(value: Long, count: Long): Unit = + recordValueWithCount(value, count) +} + - def snapshot(): MetricDistribution = { - val buffer = HdrHistogram.tempSnapshotBuffer.get() - val counts = countsArray() - val countsLimit = counts.length() +trait SnapshotCreation { + self: HdrHistogramOps with Histogram => + + private[kamon] def snapshot(resetState: Boolean, name: String, tags: Map[String, String]): MetricDistribution = { + val buffer = SnapshotCreation.tempSnapshotBuffer.get() + val countsLimit = getCountsArraySize() var index = 0 buffer.clear() @@ -62,13 +86,13 @@ private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val u var totalCount = 0L while(index < countsLimit) { - val countAtIndex = counts.getAndSet(index, 0L) + val countAtIndex = if(resetState) getAndSetFromCountsArray(index, 0L) else getFromCountsArray(index) var zerosCount = 0L if(countAtIndex == 0L) { index += 1 zerosCount = 1 - while(index < countsLimit && counts.get(index) == 0L) { + while(index < countsLimit && getFromCountsArray(index) == 0L) { index += 1 zerosCount += 1 } @@ -100,7 +124,7 @@ private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val u } private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer, - unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Distribution { + unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Distribution { val min: Long = if(count == 0) 0 else bucketValueAtIndex(minIndex) val max: Long = bucketValueAtIndex(maxIndex) @@ -197,13 +221,16 @@ private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val u case class DefaultPercentile(quantile: Double, value: Long, countUnderQuantile: Long) extends Percentile case class MutablePercentile(var quantile: Double, var value: Long, var countUnderQuantile: Long) extends Percentile -} -object HdrHistogram { - private val logger = LoggerFactory.getLogger(classOf[HdrHistogram]) +} - // TODO: move this to some object pool might be better, or at +object SnapshotCreation { + // TODO: maybe make the buffer size configurable or make it auto-expanding. private val tempSnapshotBuffer = new ThreadLocal[ByteBuffer] { override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792) } +} + +object AtomicHdrHistogram { + private val logger = LoggerFactory.getLogger(classOf[AtomicHdrHistogram]) } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala index c77dc426..3bc862e0 100644 --- a/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala @@ -16,25 +16,62 @@ package org.HdrHistogram import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLongArray - import kamon.metric.DynamicRange + +trait HdrHistogramOps { + def getCountsArraySize(): Int + def getFromCountsArray(index: Int): Long + def getAndSetFromCountsArray(index: Int, newValue: Long): Long + def protectedUnitMagnitude(): Int + def protectedSubBucketHalfCount(): Int + def protectedSubBucketHalfCountMagnitude(): Int +} + /** * Exposes package-private members of org.HdrHistogram.AtomicHistogram. */ abstract class AtomicHistogramExtension(dr: DynamicRange) - extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) { + extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) with HdrHistogramOps { override def incrementTotalCount(): Unit = {} override def addToTotalCount(value: Long): Unit = {} - def countsArray(): AtomicLongArray = counts - def protectedUnitMagnitude(): Int = unitMagnitude - def protectedSubBucketHalfCount(): Int = subBucketHalfCount - def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude + override def getCountsArraySize(): Int = counts.length() + override def getFromCountsArray(index: Int): Long = counts.get(index) + override def getAndSetFromCountsArray(index: Int, newValue: Long): Long = counts.getAndSet(index, newValue) + + override def protectedUnitMagnitude(): Int = unitMagnitude + override def protectedSubBucketHalfCount(): Int = subBucketHalfCount + override def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude } + +/** + * Exposes package-private members of org.HdrHistogram.AtomicHistogram. + */ +abstract class SimpleHistogramExtension(dr: DynamicRange) + extends Histogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) with HdrHistogramOps { + + override def incrementTotalCount(): Unit = {} + override def addToTotalCount(value: Long): Unit = {} + + override def getCountsArraySize(): Int = counts.length + override def getFromCountsArray(index: Int): Long = getCountAtIndex(index) + override def getAndSetFromCountsArray(index: Int, newValue: Long): Long = { + val v = getCountAtIndex(index) + setCountAtIndex(index, newValue) + v + } + + override def protectedUnitMagnitude(): Int = unitMagnitude + override def protectedSubBucketHalfCount(): Int = subBucketHalfCount + override def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude +} + + + + /** * Exposes the package-private members of org.HdrHistogram.ZigZagEncoding. */ diff --git a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala index 2eeb69f8..2869595a 100644 --- a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala @@ -29,8 +29,8 @@ import scala.concurrent.duration._ private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: DynamicRange, defaultMMCounterDynamicRange: DynamicRange, defaultMMCounterSampleInterval: Duration, customSettings: Map[String, CustomInstrumentSettings]) { - def buildHistogram(dynamicRange: Option[DynamicRange])(name: String, tags: Map[String, String], unit: MeasurementUnit): HdrHistogram = - new HdrHistogram(name, tags, unit, instrumentDynamicRange(name, dynamicRange.getOrElse(defaultHistogramDynamicRange))) + def buildHistogram(dynamicRange: Option[DynamicRange])(name: String, tags: Map[String, String], unit: MeasurementUnit): AtomicHdrHistogram = + new AtomicHdrHistogram(name, tags, unit, instrumentDynamicRange(name, dynamicRange.getOrElse(defaultHistogramDynamicRange))) def buildMinMaxCounter(dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]) (name: String, tags: Map[String, String], unit: MeasurementUnit): SimpleMinMaxCounter = @@ -40,10 +40,10 @@ private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: Dy buildHistogram(dynamicRange.orElse(Some(defaultMMCounterDynamicRange)))(name, tags, unit), instrumentSampleInterval(name, sampleInterval.getOrElse(defaultMMCounterSampleInterval))) - def buildGauge(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableGauge = + def buildGauge(name: String, tags: Map[String, String], unit: MeasurementUnit): AtomicLongGauge = new AtomicLongGauge(name, tags, unit) - def buildCounter(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableCounter = + def buildCounter(name: String, tags: Map[String, String], unit: MeasurementUnit): LongAdderCounter = new LongAdderCounter(name, tags, unit) diff --git a/kamon-core/src/main/scala/kamon/metric/Metric.scala b/kamon-core/src/main/scala/kamon/metric/Metric.scala index 58386353..89c0b5e9 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metric.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metric.scala @@ -83,7 +83,7 @@ private[kamon] final class HistogramMetricImpl(val name: String, val unit: Measu factory.get().buildHistogram(customDynamicRange)(name, tags, unit) override protected def createSnapshot(instrument: Histogram): MetricDistribution = - instrument.asInstanceOf[SnapshotableHistogram].snapshot() + instrument.asInstanceOf[AtomicHdrHistogram].snapshot(resetState = true) } private[kamon] final class MinMaxCounterMetricImpl(val name: String, val unit: MeasurementUnit, customDynamicRange: Option[DynamicRange], @@ -115,7 +115,7 @@ private[kamon] final class MinMaxCounterMetricImpl(val name: String, val unit: M factory.get().buildMinMaxCounter(customDynamicRange, customSampleInterval)(name, tags, unit) override protected def createSnapshot(instrument: MinMaxCounter): MetricDistribution = - instrument.asInstanceOf[SnapshotableMinMaxCounter].snapshot() + instrument.asInstanceOf[SimpleMinMaxCounter].snapshot(resetState = true) } @@ -132,7 +132,7 @@ private[kamon] final class CounterMetricImpl(val name: String, val unit: Measure factory.get().buildCounter(name, tags, unit) override protected def createSnapshot(instrument: Counter): MetricValue = - instrument.asInstanceOf[SnapshotableCounter].snapshot() + instrument.asInstanceOf[LongAdderCounter].snapshot(resetState = true) } private[kamon] final class GaugeMetricImpl(val name: String, val unit: MeasurementUnit, factory: AtomicReference[InstrumentFactory]) @@ -157,5 +157,5 @@ private[kamon] final class GaugeMetricImpl(val name: String, val unit: Measureme factory.get().buildGauge(name, tags, unit) override protected def createSnapshot(instrument: Gauge): MetricValue = - instrument.asInstanceOf[SnapshotableGauge].snapshot() + instrument.asInstanceOf[AtomicLongGauge].snapshot() } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala index a09702ae..7dca3060 100644 --- a/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala @@ -34,8 +34,8 @@ trait MinMaxCounter { def sample(): Unit } -class SimpleMinMaxCounter(name: String, tags: Map[String, String], underlyingHistogram: HdrHistogram, - val sampleInterval: Duration) extends SnapshotableMinMaxCounter { +class SimpleMinMaxCounter(name: String, tags: Map[String, String], underlyingHistogram: AtomicHdrHistogram, + val sampleInterval: Duration) extends MinMaxCounter{ private val min = AtomicLongMaxUpdater() private val max = AtomicLongMaxUpdater() @@ -47,8 +47,8 @@ class SimpleMinMaxCounter(name: String, tags: Map[String, String], underlyingHis def unit: MeasurementUnit = underlyingHistogram.unit - private[kamon] def snapshot(): MetricDistribution = - underlyingHistogram.snapshot() + private[kamon] def snapshot(resetState: Boolean): MetricDistribution = + underlyingHistogram.snapshot(resetState) def increment(): Unit = increment(1L) diff --git a/kamon-core/src/main/scala/kamon/metric/Scaler.scala b/kamon-core/src/main/scala/kamon/metric/Scaler.scala index 8f068fa2..21270e4c 100644 --- a/kamon-core/src/main/scala/kamon/metric/Scaler.scala +++ b/kamon-core/src/main/scala/kamon/metric/Scaler.scala @@ -22,7 +22,7 @@ class Scaler(targetTimeUnit: MeasurementUnit, targetInformationUnit: Measurement require(targetTimeUnit.dimension == Dimension.Time, "timeUnit must be in the time dimension.") require(targetInformationUnit.dimension == Dimension.Information, "informationUnit must be in the information dimension.") - val scaleHistogram = new HdrHistogram("scaler", Map.empty, MeasurementUnit.none, dynamicRange) + val scaleHistogram = new AtomicHdrHistogram("scaler", Map.empty, MeasurementUnit.none, dynamicRange) def scaleDistribution(metric: MetricDistribution): MetricDistribution = { metric.measurementUnit match { @@ -54,7 +54,7 @@ class Scaler(targetTimeUnit: MeasurementUnit, targetInformationUnit: Measurement scaleHistogram.record(Math.ceil(scaledValue).toLong, b.frequency) }) - scaleHistogram.snapshot().copy( + scaleHistogram.snapshot(resetState = true).copy( name = metric.name, tags = metric.tags, measurementUnit = targetUnit, diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala index d5beed6c..68b14223 100644 --- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -73,18 +73,18 @@ trait Percentile { def countUnderQuantile: Long } - -trait DistributionSnapshotInstrument { - private[kamon] def snapshot(): MetricDistribution -} - -trait SingleValueSnapshotInstrument { - private[kamon] def snapshot(): MetricValue -} - -trait SnapshotableHistogram extends Histogram with DistributionSnapshotInstrument -trait SnapshotableMinMaxCounter extends MinMaxCounter with DistributionSnapshotInstrument -trait SnapshotableCounter extends Counter with SingleValueSnapshotInstrument -trait SnapshotableGauge extends Gauge with SingleValueSnapshotInstrument +// +//trait DistributionSnapshotInstrument { +// private[kamon] def snapshot(resetState: Boolean): MetricDistribution +//} +// +//trait SingleValueSnapshotInstrument { +// private[kamon] def snapshot(resetState: Boolean): MetricValue +//} +// +//trait SnapshotableHistogram extends Histogram with DistributionSnapshotInstrument +//trait SnapshotableMinMaxCounter extends MinMaxCounter with DistributionSnapshotInstrument +//trait SnapshotableCounter extends Counter with SingleValueSnapshotInstrument +//trait SnapshotableGauge extends Gauge with SingleValueSnapshotInstrument -- cgit v1.2.3