diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-04-27 23:48:39 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-04-27 23:48:39 +0200 |
commit | f5e70695ad0124cd5cd648d186d5174c7b121266 (patch) | |
tree | 0f0f86af677653ce45435c127c545656f45e81df /kamon-core/src/main/scala/kamon/metric/instrument | |
parent | 0fe9e267c7cec7a176fc8b0a43e73e12b6606b9f (diff) | |
download | Kamon-f5e70695ad0124cd5cd648d186d5174c7b121266.tar.gz Kamon-f5e70695ad0124cd5cd648d186d5174c7b121266.tar.bz2 Kamon-f5e70695ad0124cd5cd648d186d5174c7b121266.zip |
implement HdrHistogram and Distribution snapshots
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument')
8 files changed, 321 insertions, 21 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index 4a9edd77..10b9c3a6 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -1,11 +1,34 @@ -package kamon.metric.instrument +package kamon +package metric +package instrument -import kamon.metric.Entity +import java.util.concurrent.atomic.LongAdder + +import com.typesafe.scalalogging.StrictLogging +import kamon.util.MeasurementUnit trait Counter { + def measurementUnit: MeasurementUnit + def increment(): Unit + def increment(times: Long): Unit } -object Counter { - def apply(entity: Entity, name: String): Counter = ??? +class LongAdderCounter(entity: Entity, name: String, val measurementUnit: MeasurementUnit) + extends Counter with SingleValueSnapshotInstrument with StrictLogging { + + private val adder = new LongAdder() + + def increment(): Unit = + adder.increment() + + def increment(times: Long): Unit = { + if (times >= 0) + adder.add(times) + else + logger.warn(s"Ignored attempt to decrement counter [$name] on entity [$entity]") + } + + def snapshot(): SingleValueSnapshot = + SingleValueSnapshot(name, measurementUnit, adder.sumThenReset()) } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala b/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala index 628439c2..226f5450 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala @@ -17,17 +17,17 @@ object DynamicRange { * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 1 significant digit (10%) * across that range. */ - val Loose = DynamicRange(0L, oneHourInNanoseconds, 1) + val Loose = DynamicRange(1L, oneHourInNanoseconds, 1) /** * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 2 significant digit (1%) * across that range. */ - val Default = DynamicRange(0L, oneHourInNanoseconds, 2) + val Default = DynamicRange(1L, oneHourInNanoseconds, 2) /** * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 3 significant digit (0.1%) * across that range. */ - val Fine = DynamicRange(0L, oneHourInNanoseconds, 3) + val Fine = DynamicRange(1L, oneHourInNanoseconds, 3) } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index 43c71206..bb31e30a 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -1,8 +1,11 @@ package kamon.metric.instrument import kamon.metric.Entity +import kamon.util.MeasurementUnit trait Gauge { + def measurementUnit: MeasurementUnit + def increment(): Unit def increment(times: Long): Unit def decrement(): Unit diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index c43c9dbc..76d4ab65 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -1,18 +1,192 @@ package kamon.metric.instrument +import java.nio.ByteBuffer + +import com.typesafe.scalalogging.StrictLogging import kamon.metric.Entity +import kamon.util.MeasurementUnit +import org.HdrHistogram.{AtomicHistogramExtension, ZigZag} trait Histogram { def dynamicRange: DynamicRange + def measurementUnit: MeasurementUnit def record(value: Long): Unit def record(value: Long, times: Long): Unit } -object Histogram { - def apply(entity: Entity, name: String, dynamicRange2: DynamicRange): Histogram = new Histogram { - override def dynamicRange: DynamicRange = dynamicRange2 - override def record(value: Long): Unit = ??? - override def record(value: Long, times: Long): Unit = ??? + +class HdrHistogram(entity: Entity, name: String, val measurementUnit: MeasurementUnit, val dynamicRange: DynamicRange) + extends AtomicHistogramExtension(dynamicRange) with Histogram with DistributionSnapshotInstrument with StrictLogging { + + def record(value: Long): Unit = + tryRecord(value, 1) + + def record(value: Long, count: Long): Unit = + tryRecord(value, count) + + private def tryRecord(value: Long, count: Long): Unit = { + try { + recordValueWithCount(value, count) + } catch { + case anyException: Throwable ⇒ + logger.warn(s"Failed to store value [$value] in histogram [$name] of entity [$entity]. You might need to change " + + "your dynamic range configuration for this instrument.", anyException) + } + } + + override def snapshot(): DistributionSnapshot = { + val buffer = HdrHistogram.tempSnapshotBuffer.get() + val counts = countsArray() + val countsLimit = counts.length() + var index = 0 + buffer.clear() + + var minIndex = Int.MaxValue + var maxIndex = 0 + var totalCount = 0L + + while(index < countsLimit) { + val countAtIndex = counts.getAndSet(index, 0L) + + var zerosCount = 0L + if(countAtIndex == 0L) { + index += 1 + zerosCount = 1 + while(index < countsLimit && counts.get(index) == 0L) { + index += 1 + zerosCount += 1 + } + } + + if(zerosCount > 0) { + if(index < countsLimit) + ZigZag.putLong(buffer, -zerosCount) + } + else { + if(minIndex > index) + minIndex = index + maxIndex = index + + index += 1 + totalCount += countAtIndex + ZigZag.putLong(buffer, countAtIndex) + } + } + + buffer.flip() + val zigZagCounts = Array.ofDim[Byte](buffer.limit()) + buffer.get(zigZagCounts) + + val distribution = new ZigZagCountsDistribution(totalCount, minIndex, maxIndex, ByteBuffer.wrap(zigZagCounts), + protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) + + DistributionSnapshot(name, measurementUnit, dynamicRange, distribution) } + + private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer, + unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Distribution { + + val min: Long = if(count == 0) 0 else bucketValueAtIndex(minIndex) + val max: Long = bucketValueAtIndex(maxIndex) + def sum: Long = bucketsIterator.foldLeft(0L)((a, b) => a + (b.value * b.frequency)) + + def buckets: Seq[Bucket] = { + val builder = Vector.newBuilder[Bucket] + bucketsIterator.foreach { b => + builder += DefaultBucket(b.value, b.frequency) + } + + builder.result() + } + + def bucketsIterator: Iterator[Bucket] = new Iterator[Bucket] { + val buffer = zigZagCounts.duplicate() + val bucket = MutableBucket(0, 0) + var countsArrayIndex = 0 + + def hasNext: Boolean = + buffer.remaining() > 0 + + def next(): Bucket = { + val readLong = ZigZag.getLong(buffer) + val frequency = if(readLong > 0) { + readLong + } else { + countsArrayIndex += (-readLong.toInt) + ZigZag.getLong(buffer) + } + + bucket.value = bucketValueAtIndex(countsArrayIndex) + bucket.frequency = frequency + countsArrayIndex += 1 + bucket + } + } + + def percentilesIterator: Iterator[Percentile] = new Iterator[Percentile]{ + val buckets = bucketsIterator + val percentile = MutablePercentile(0D, 0, 0) + var countUnderQuantile = 0L + + def hasNext: Boolean = + buckets.hasNext + + def next(): Percentile = { + val bucket = buckets.next() + countUnderQuantile += bucket.frequency + percentile.quantile = (countUnderQuantile * 100D) / ZigZagCountsDistribution.this.count + percentile.countUnderQuantile = countUnderQuantile + percentile.value = bucket.value + percentile + } + } + + def percentile(p: Double): Percentile = { + val percentiles = percentilesIterator + if(percentiles.hasNext) { + var currentPercentile = percentiles.next() + while(percentiles.hasNext && currentPercentile.quantile < p) { + currentPercentile = percentiles.next() + } + + currentPercentile + + } else DefaultPercentile(p, 0, 0) + } + + + def percentiles: Seq[Percentile] = { + val builder = Vector.newBuilder[Percentile] + percentilesIterator.foreach { p => + builder += DefaultPercentile(p.quantile, p.value, p.countUnderQuantile) + } + + builder.result() + } + + @inline private def bucketValueAtIndex(index: Int): Long = { + var bucketIndex: Int = (index >> subBucketHalfCountMagnitude) - 1 + var subBucketIndex: Int = (index & (subBucketHalfCount - 1)) + subBucketHalfCount + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount + bucketIndex = 0 + } + + subBucketIndex.toLong << (bucketIndex + unitMagnitude) + } + } + + case class DefaultBucket(value: Long, frequency: Long) extends Bucket + case class MutableBucket(var value: Long, var frequency: Long) extends Bucket + + case class DefaultPercentile(quantile: Double, value: Long, countUnderQuantile: Long) extends Percentile + case class MutablePercentile(var quantile: Double, var value: Long, var countUnderQuantile: Long) extends Percentile } + +object HdrHistogram { + // TODO: move this to some object pool might be better, or at + private val tempSnapshotBuffer = new ThreadLocal[ByteBuffer] { + override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792) + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala new file mode 100644 index 00000000..ebb82040 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala @@ -0,0 +1,40 @@ +package org.HdrHistogram + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLongArray + +import kamon.metric.instrument.DynamicRange + +/** + * This class exposes package-private members of the [[AtomicHistogram]] class that are required to properly generate + * snapshots of our HdrHistogram implementation. + */ +abstract class AtomicHistogramExtension(dr: DynamicRange) + extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) { + + override def incrementTotalCount(): Unit = {} + override def addToTotalCount(value: Long): Unit = {} + + def countsArray(): AtomicLongArray = counts + def protectedUnitMagnitude(): Int = unitMagnitude + def protectedSubBucketHalfCount(): Int = subBucketHalfCount + def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude +} + +/** + * Exposes the package-private members of [[ZigZagEncoding]]. + */ +object ZigZag { + def putLong(buffer: ByteBuffer, value: Long): Unit = + ZigZagEncoding.putLong(buffer, value) + + def getLong(buffer: ByteBuffer): Long = + ZigZagEncoding.getLong(buffer) + + def putInt(buffer: ByteBuffer, value: Int): Unit = + ZigZagEncoding.putInt(buffer, value) + + def getInt(buffer: ByteBuffer): Int = + ZigZagEncoding.getInt(buffer) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala index 820c05b5..1ccd5899 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -6,31 +6,42 @@ import java.time.Duration import com.typesafe.config.Config import kamon.metric.instrument.InstrumentFactory.CustomInstrumentSettings +import kamon.util.MeasurementUnit -private[kamon] class InstrumentFactory private ( +private[metric] class InstrumentFactory private ( defaultHistogramDynamicRange: DynamicRange, defaultMMCounterDynamicRange: DynamicRange, defaultMMCounterSampleRate: Duration, customSettings: Map[(String, String), CustomInstrumentSettings]) { - def buildHistogram(entity: Entity, name: String, dynamicRange: DynamicRange = defaultHistogramDynamicRange): Histogram = - Histogram(entity, name, instrumentDynamicRange(entity, name, dynamicRange)) + def buildHistogram(entity: Entity, name: String, dynamicRange: DynamicRange = defaultHistogramDynamicRange, + measurementUnit: MeasurementUnit = MeasurementUnit.none): Histogram = { + + new HdrHistogram( + entity, + name, + measurementUnit, + instrumentDynamicRange(entity, name, dynamicRange) + ) + } def buildMinMaxCounter(entity: Entity, name: String, dynamicRange: DynamicRange = defaultMMCounterDynamicRange, - sampleInterval: Duration = defaultMMCounterSampleRate): MinMaxCounter = + sampleInterval: Duration = defaultMMCounterSampleRate, measurementUnit: MeasurementUnit = MeasurementUnit.none): MinMaxCounter = { + MinMaxCounter( entity, name, instrumentDynamicRange(entity, name, dynamicRange), instrumentSampleInterval(entity, name, sampleInterval) ) + } - def buildGauge(entity: Entity, name: String): Gauge = + def buildGauge(entity: Entity, name: String, measurementUnit: MeasurementUnit = MeasurementUnit.none): Gauge = Gauge(entity, name) - def buildCounter(entity: Entity, name: String): Counter = - Counter(entity, name) + def buildCounter(entity: Entity, name: String, measurementUnit: MeasurementUnit = MeasurementUnit.none): Counter with SingleValueSnapshotInstrument = + new LongAdderCounter(entity, name, measurementUnit) private def instrumentDynamicRange(entity: Entity, instrumentName: String, dynamicRange: DynamicRange): DynamicRange = @@ -51,9 +62,9 @@ private[kamon] class InstrumentFactory private ( ) } -object InstrumentFactory { +private[kamon] object InstrumentFactory { - def apply(config: Config): InstrumentFactory = { + private[kamon] def apply(config: Config): InstrumentFactory = { val histogramDynamicRange = readDynamicRange(config.getConfig("default-settings.histogram")) val mmCounterDynamicRange = readDynamicRange(config.getConfig("default-settings.min-max-counter")) val mmCounterSampleInterval = config.getDuration("default-settings.min-max-counter.sample-interval") diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala new file mode 100644 index 00000000..58e10c54 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala @@ -0,0 +1,44 @@ +package kamon.metric.instrument + +import kamon.util.MeasurementUnit + + +case class SingleValueSnapshot(name: String, measurementUnit: MeasurementUnit, value: Long) + +case class DistributionSnapshot(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, distribution: Distribution) + +trait DistributionSnapshotInstrument { + def snapshot(): DistributionSnapshot +} + +trait SingleValueSnapshotInstrument { + def snapshot(): SingleValueSnapshot +} + + + + +trait Distribution { + def buckets: Seq[Bucket] + def bucketsIterator: Iterator[Bucket] + + def min: Long + def max: Long + def sum: Long + def count: Long + def percentile(p: Double): Percentile + + def percentiles: Seq[Percentile] + def percentilesIterator: Iterator[Percentile] +} + +trait Bucket { + def value: Long + def frequency: Long +} + +trait Percentile { + def quantile: Double + def value: Long + def countUnderQuantile: Long +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala index 34a983a9..8a43865f 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -3,10 +3,12 @@ package kamon.metric.instrument import java.time.Duration import kamon.metric.Entity +import kamon.util.MeasurementUnit trait MinMaxCounter { def dynamicRange: DynamicRange def sampleInterval: Duration + def measurementUnit: MeasurementUnit def increment(): Unit def increment(times: Long): Unit @@ -16,6 +18,9 @@ trait MinMaxCounter { object MinMaxCounter { def apply(entity: Entity, name: String, dynamicRange2: DynamicRange, sampleInterval2: Duration): MinMaxCounter = new MinMaxCounter { + + override def measurementUnit: MeasurementUnit = ??? + override def sampleInterval: Duration = sampleInterval2 override def increment(): Unit = ??? override def increment(times: Long): Unit = ??? |