From c52f8eaca0d1ccc4c992cba039e35e099b5b478b Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Tue, 6 Jun 2017 14:15:15 +0200 Subject: make it compile for Scala 2.11 and 2.12 --- .../src/main/scala/kamon/metric/Counter.scala | 32 ++++ .../src/main/scala/kamon/metric/DynamicRange.scala | 33 ++++ kamon-core/src/main/scala/kamon/metric/Gauge.scala | 40 +++++ .../src/main/scala/kamon/metric/Histogram.scala | 191 ++++++++++++++++++++ .../scala/kamon/metric/HistogramExtension.scala | 39 +++++ .../scala/kamon/metric/InstrumentFactory.scala | 98 +++++++++++ .../src/main/scala/kamon/metric/MetricLookup.scala | 3 +- .../main/scala/kamon/metric/MetricRegistry.scala | 12 +- .../main/scala/kamon/metric/MinMaxCounter.scala | 76 ++++++++ .../src/main/scala/kamon/metric/TickSnapshot.scala | 68 +++++++- .../scala/kamon/metric/instrument/Counter.scala | 34 ---- .../kamon/metric/instrument/DynamicRange.scala | 33 ---- .../main/scala/kamon/metric/instrument/Gauge.scala | 39 ----- .../scala/kamon/metric/instrument/Histogram.scala | 193 --------------------- .../metric/instrument/HistogramExtension.scala | 39 ----- .../metric/instrument/InstrumentFactory.scala | 98 ----------- .../metric/instrument/InstrumentSnapshot.scala | 57 ------ .../kamon/metric/instrument/MinMaxCounter.scala | 75 -------- 18 files changed, 577 insertions(+), 583 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metric/Counter.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/DynamicRange.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Gauge.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Histogram.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala (limited to 'kamon-core/src/main/scala/kamon/metric') diff --git a/kamon-core/src/main/scala/kamon/metric/Counter.scala b/kamon-core/src/main/scala/kamon/metric/Counter.scala new file mode 100644 index 00000000..38d312f0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Counter.scala @@ -0,0 +1,32 @@ +package kamon.metric + +import java.util.concurrent.atomic.LongAdder + +import com.typesafe.scalalogging.StrictLogging +import kamon.util.MeasurementUnit + +trait Counter { + def measurementUnit: MeasurementUnit + + def increment(): Unit + def increment(times: Long): Unit +} + +class LongAdderCounter(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit) + extends SnapshotableCounter with StrictLogging { + + private val adder = new LongAdder() + + def increment(): Unit = + adder.increment() + + def increment(times: Long): Unit = { + if (times >= 0) + adder.add(times) + else + logger.warn(s"Ignored attempt to decrement counter [$name]") + } + + def snapshot(): MetricValue = + MetricValue(name, tags, measurementUnit, adder.sumThenReset()) +} diff --git a/kamon-core/src/main/scala/kamon/metric/DynamicRange.scala b/kamon-core/src/main/scala/kamon/metric/DynamicRange.scala new file mode 100644 index 00000000..f26b1052 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/DynamicRange.scala @@ -0,0 +1,33 @@ +package kamon.metric + +import java.util.concurrent.TimeUnit + +case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, significantValueDigits: Int) { + def upTo(highestTrackableValue: Long): DynamicRange = + copy(highestTrackableValue = highestTrackableValue) + + def startingFrom(lowestDiscernibleValue: Long): DynamicRange = + copy(lowestDiscernibleValue = lowestDiscernibleValue) +} + +object DynamicRange { + private val oneHourInNanoseconds = TimeUnit.HOURS.toNanos(1) + + /** + * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 1 significant digit (10%) + * across that range. + */ + val Loose = DynamicRange(1L, oneHourInNanoseconds, 1) + + /** + * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 2 significant digit (1%) + * across that range. + */ + val Default = DynamicRange(1L, oneHourInNanoseconds, 2) + + /** + * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 3 significant digit (0.1%) + * across that range. + */ + val Fine = DynamicRange(1L, oneHourInNanoseconds, 3) +} diff --git a/kamon-core/src/main/scala/kamon/metric/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/Gauge.scala new file mode 100644 index 00000000..11876e99 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Gauge.scala @@ -0,0 +1,40 @@ +package kamon.metric + +import java.util.concurrent.atomic.AtomicLong + +import kamon.util.MeasurementUnit + +trait Gauge { + def measurementUnit: MeasurementUnit + + def increment(): Unit + def increment(times: Long): Unit + def decrement(): Unit + def decrement(times: Long): Unit + def set(value: Long): Unit +} + + +class AtomicLongGauge(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit) + extends SnapshotableGauge { + + private val currentValue = new AtomicLong(0L) + + def increment(): Unit = + currentValue.incrementAndGet() + + def increment(times: Long): Unit = + currentValue.addAndGet(times) + + def decrement(): Unit = + currentValue.decrementAndGet() + + def decrement(times: Long): Unit = + currentValue.addAndGet(-times) + + def set(value: Long): Unit = + currentValue.set(value) + + def snapshot(): MetricValue = + MetricValue(name, tags, measurementUnit, currentValue.get()) +} diff --git a/kamon-core/src/main/scala/kamon/metric/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/Histogram.scala new file mode 100644 index 00000000..47b2a1a0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Histogram.scala @@ -0,0 +1,191 @@ +package kamon.metric + +import java.nio.ByteBuffer + +import com.typesafe.scalalogging.StrictLogging +import kamon.util.MeasurementUnit +import org.HdrHistogram.{AtomicHistogramExtension, ZigZag} + +trait Histogram { + def dynamicRange: DynamicRange + def measurementUnit: MeasurementUnit + + def record(value: Long): Unit + def record(value: Long, times: Long): Unit +} + + +class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit, val dynamicRange: DynamicRange) + extends AtomicHistogramExtension(dynamicRange) with SnapshotableHistogram with StrictLogging { + + def record(value: Long): Unit = + tryRecord(value, 1) + + def record(value: Long, count: Long): Unit = + tryRecord(value, count) + + private def tryRecord(value: Long, count: Long): Unit = { + try { + recordValueWithCount(value, count) + } catch { + case anyException: Throwable ⇒ + logger.warn(s"Failed to store value [$value] in histogram [$name]. You might need to change " + + "your dynamic range configuration for this instrument.", anyException) + } + } + + override def snapshot(): MetricDistribution = { + val buffer = HdrHistogram.tempSnapshotBuffer.get() + val counts = countsArray() + val countsLimit = counts.length() + var index = 0 + buffer.clear() + + var minIndex = Int.MaxValue + var maxIndex = 0 + var totalCount = 0L + + while(index < countsLimit) { + val countAtIndex = counts.getAndSet(index, 0L) + + var zerosCount = 0L + if(countAtIndex == 0L) { + index += 1 + zerosCount = 1 + while(index < countsLimit && counts.get(index) == 0L) { + index += 1 + zerosCount += 1 + } + } + + if(zerosCount > 0) { + if(index < countsLimit) + ZigZag.putLong(buffer, -zerosCount) + } + else { + if(minIndex > index) + minIndex = index + maxIndex = index + + index += 1 + totalCount += countAtIndex + ZigZag.putLong(buffer, countAtIndex) + } + } + + buffer.flip() + val zigZagCounts = Array.ofDim[Byte](buffer.limit()) + buffer.get(zigZagCounts) + + val distribution = new ZigZagCountsDistribution(totalCount, minIndex, maxIndex, ByteBuffer.wrap(zigZagCounts), + protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) + + MetricDistribution(name, tags, measurementUnit, dynamicRange, distribution) + } + + private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer, + unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Distribution { + + val min: Long = if(count == 0) 0 else bucketValueAtIndex(minIndex) + val max: Long = bucketValueAtIndex(maxIndex) + def sum: Long = bucketsIterator.foldLeft(0L)((a, b) => a + (b.value * b.frequency)) + + def buckets: Seq[Bucket] = { + val builder = Vector.newBuilder[Bucket] + bucketsIterator.foreach { b => + builder += DefaultBucket(b.value, b.frequency) + } + + builder.result() + } + + def bucketsIterator: Iterator[Bucket] = new Iterator[Bucket] { + val buffer = zigZagCounts.duplicate() + val bucket = MutableBucket(0, 0) + var countsArrayIndex = 0 + + def hasNext: Boolean = + buffer.remaining() > 0 + + def next(): Bucket = { + val readLong = ZigZag.getLong(buffer) + val frequency = if(readLong > 0) { + readLong + } else { + countsArrayIndex += (-readLong.toInt) + ZigZag.getLong(buffer) + } + + bucket.value = bucketValueAtIndex(countsArrayIndex) + bucket.frequency = frequency + countsArrayIndex += 1 + bucket + } + } + + def percentilesIterator: Iterator[Percentile] = new Iterator[Percentile]{ + val buckets = bucketsIterator + val percentile = MutablePercentile(0D, 0, 0) + var countUnderQuantile = 0L + + def hasNext: Boolean = + buckets.hasNext + + def next(): Percentile = { + val bucket = buckets.next() + countUnderQuantile += bucket.frequency + percentile.quantile = (countUnderQuantile * 100D) / ZigZagCountsDistribution.this.count + percentile.countUnderQuantile = countUnderQuantile + percentile.value = bucket.value + percentile + } + } + + def percentile(p: Double): Percentile = { + val percentiles = percentilesIterator + if(percentiles.hasNext) { + var currentPercentile = percentiles.next() + while(percentiles.hasNext && currentPercentile.quantile < p) { + currentPercentile = percentiles.next() + } + + currentPercentile + + } else DefaultPercentile(p, 0, 0) + } + + + def percentiles: Seq[Percentile] = { + val builder = Vector.newBuilder[Percentile] + percentilesIterator.foreach { p => + builder += DefaultPercentile(p.quantile, p.value, p.countUnderQuantile) + } + + builder.result() + } + + @inline private def bucketValueAtIndex(index: Int): Long = { + var bucketIndex: Int = (index >> subBucketHalfCountMagnitude) - 1 + var subBucketIndex: Int = (index & (subBucketHalfCount - 1)) + subBucketHalfCount + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount + bucketIndex = 0 + } + + subBucketIndex.toLong << (bucketIndex + unitMagnitude) + } + } + + case class DefaultBucket(value: Long, frequency: Long) extends Bucket + case class MutableBucket(var value: Long, var frequency: Long) extends Bucket + + case class DefaultPercentile(quantile: Double, value: Long, countUnderQuantile: Long) extends Percentile + case class MutablePercentile(var quantile: Double, var value: Long, var countUnderQuantile: Long) extends Percentile +} + +object HdrHistogram { + // TODO: move this to some object pool might be better, or at + private val tempSnapshotBuffer = new ThreadLocal[ByteBuffer] { + override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala new file mode 100644 index 00000000..bf425ee1 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala @@ -0,0 +1,39 @@ +package org.HdrHistogram + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLongArray + +import kamon.metric.DynamicRange + +/** + * Exposes package-private members of [[org.HdrHistogram.AtomicHistogram]]. + */ +abstract class AtomicHistogramExtension(dr: DynamicRange) + extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) { + + override def incrementTotalCount(): Unit = {} + override def addToTotalCount(value: Long): Unit = {} + + def countsArray(): AtomicLongArray = counts + def protectedUnitMagnitude(): Int = unitMagnitude + def protectedSubBucketHalfCount(): Int = subBucketHalfCount + def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude +} + +/** + * Exposes the package-private members of [[org.HdrHistogram.ZigZagEncoding]]. + */ +object ZigZag { + def putLong(buffer: ByteBuffer, value: Long): Unit = + ZigZagEncoding.putLong(buffer, value) + + def getLong(buffer: ByteBuffer): Long = + ZigZagEncoding.getLong(buffer) + + def putInt(buffer: ByteBuffer, value: Int): Unit = + ZigZagEncoding.putInt(buffer, value) + + def getInt(buffer: ByteBuffer): Int = + ZigZagEncoding.getInt(buffer) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala new file mode 100644 index 00000000..68034bb8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala @@ -0,0 +1,98 @@ +package kamon +package metric + + +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config +import kamon.metric.InstrumentFactory.CustomInstrumentSettings +import kamon.util.MeasurementUnit + +import scala.concurrent.duration._ + + +private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: DynamicRange, defaultMMCounterDynamicRange: DynamicRange, + defaultMMCounterSampleInterval: Duration, customSettings: Map[String, CustomInstrumentSettings]) { + + def buildHistogram(dynamicRange: Option[DynamicRange])(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableHistogram = + new HdrHistogram(name, tags, unit, instrumentDynamicRange(name, dynamicRange.getOrElse(defaultHistogramDynamicRange))) + + def buildMinMaxCounter(dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]) + (name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableMinMaxCounter = + new PaddedMinMaxCounter( + name, + tags, + buildHistogram(dynamicRange.orElse(Some(defaultMMCounterDynamicRange)))(name, tags, unit), + instrumentSampleInterval(name, sampleInterval.getOrElse(defaultMMCounterSampleInterval))) + + def buildGauge(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableGauge = + new AtomicLongGauge(name, tags, unit) + + def buildCounter(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableCounter = + new LongAdderCounter(name, tags, unit) + + + private def instrumentDynamicRange(instrumentName: String, dynamicRange: DynamicRange): DynamicRange = + customSettings.get(instrumentName).fold(dynamicRange) { cs => + overrideDynamicRange(dynamicRange, cs) + } + + private def instrumentSampleInterval(instrumentName: String, sampleInterval: Duration): Duration = + customSettings.get(instrumentName).fold(sampleInterval) { cs => + cs.sampleInterval.getOrElse(sampleInterval) + } + + private def overrideDynamicRange(defaultDynamicRange: DynamicRange, customSettings: CustomInstrumentSettings): DynamicRange = + DynamicRange( + customSettings.lowestDiscernibleValue.getOrElse(defaultDynamicRange.lowestDiscernibleValue), + customSettings.highestTrackableValue.getOrElse(defaultDynamicRange.highestTrackableValue), + customSettings.significantValueDigits.getOrElse(defaultDynamicRange.significantValueDigits) + ) +} + +object InstrumentFactory { + + def fromConfig(config: Config): InstrumentFactory = { + val factoryConfig = config.getConfig("kamon.metric.instrument-factory") + val histogramDynamicRange = readDynamicRange(factoryConfig.getConfig("default-settings.histogram")) + val mmCounterDynamicRange = readDynamicRange(factoryConfig.getConfig("default-settings.min-max-counter")) + val mmCounterSampleInterval = factoryConfig.getDuration("default-settings.min-max-counter.sample-interval", TimeUnit.MILLISECONDS) + + val customSettings = factoryConfig.getConfig("custom-settings") + .configurations + .filter(nonEmptySection) + .map(readCustomInstrumentSettings) + + new InstrumentFactory(histogramDynamicRange, mmCounterDynamicRange, mmCounterSampleInterval.millis, customSettings) + } + + private def nonEmptySection(entry: (String, Config)): Boolean = entry match { + case (_, config) => config.firstLevelKeys.nonEmpty + } + + private def readCustomInstrumentSettings(entry: (String, Config)): (String, CustomInstrumentSettings) = { + val (metricName, metricConfig) = entry + val customSettings = CustomInstrumentSettings( + if (metricConfig.hasPath("lowest-discernible-value")) Some(metricConfig.getLong("lowest-discernible-value")) else None, + if (metricConfig.hasPath("highest-trackable-value")) Some(metricConfig.getLong("highest-trackable-value")) else None, + if (metricConfig.hasPath("significant-value-digits")) Some(metricConfig.getInt("significant-value-digits")) else None, + if (metricConfig.hasPath("sample-interval")) Some(metricConfig.getDuration("sample-interval", TimeUnit.MILLISECONDS).millis) else None + ) + + (metricName -> customSettings) + } + + private def readDynamicRange(config: Config): DynamicRange = + DynamicRange( + lowestDiscernibleValue = config.getLong("lowest-discernible-value"), + highestTrackableValue = config.getLong("highest-trackable-value"), + significantValueDigits = config.getInt("significant-value-digits") + ) + + private case class CustomInstrumentSettings( + lowestDiscernibleValue: Option[Long], + highestTrackableValue: Option[Long], + significantValueDigits: Option[Int], + sampleInterval: Option[Duration] + ) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala index db33b83c..e28b1435 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala @@ -1,10 +1,9 @@ package kamon package metric -import java.time.Duration -import kamon.metric.instrument._ import kamon.util.MeasurementUnit +import scala.concurrent.duration.Duration trait MetricLookup { diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index c6513f1a..de64bc17 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -1,15 +1,14 @@ package kamon package metric -import java.time.Duration import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.Config import com.typesafe.scalalogging.Logger -import kamon.metric.instrument._ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.Duration class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { @@ -36,10 +35,10 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { override def snapshot(): MetricsSnapshot = synchronized { - var histograms = Seq.empty[DistributionSnapshot] - var mmCounters = Seq.empty[DistributionSnapshot] - var counters = Seq.empty[SingleValueSnapshot] - var gauges = Seq.empty[SingleValueSnapshot] + var histograms = Seq.empty[MetricDistribution] + var mmCounters = Seq.empty[MetricDistribution] + var counters = Seq.empty[MetricValue] + var gauges = Seq.empty[MetricValue] for { metricEntry <- metrics.values @@ -50,6 +49,7 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { case InstrumentType.MinMaxCounter => mmCounters = mmCounters :+ instrument.asInstanceOf[SnapshotableMinMaxCounter].snapshot() case InstrumentType.Gauge => gauges = gauges :+ instrument.asInstanceOf[SnapshotableGauge].snapshot() case InstrumentType.Counter => counters = counters :+ instrument.asInstanceOf[SnapshotableCounter].snapshot() + case other => logger.warn("Unexpected instrument type [{}] found in the registry", other ) } } diff --git a/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala new file mode 100644 index 00000000..4ac1ce74 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala @@ -0,0 +1,76 @@ +package kamon.metric + +import java.lang.Math.abs +import java.util.concurrent.atomic.AtomicLong + +import kamon.jsr166.LongMaxUpdater +import kamon.util.MeasurementUnit + +import scala.concurrent.duration.Duration + +trait MinMaxCounter { + def dynamicRange: DynamicRange + def sampleInterval: Duration + def measurementUnit: MeasurementUnit + + def increment(): Unit + def increment(times: Long): Unit + def decrement(): Unit + def decrement(times: Long): Unit + def sample(): Unit +} + + +class PaddedMinMaxCounter(name: String, tags: Map[String, String], underlyingHistogram: Histogram with DistributionSnapshotInstrument, + val sampleInterval: Duration) extends SnapshotableMinMaxCounter { + + private val min = new LongMaxUpdater(0L) + private val max = new LongMaxUpdater(0L) + private val sum = new AtomicLong() + + def dynamicRange: DynamicRange = + underlyingHistogram.dynamicRange + + def measurementUnit: MeasurementUnit = + underlyingHistogram.measurementUnit + + private[kamon] def snapshot(): MetricDistribution = + underlyingHistogram.snapshot() + + def increment(): Unit = + increment(1L) + + def increment(times: Long): Unit = { + val currentValue = sum.addAndGet(times) + max.update(currentValue) + } + + def decrement(): Unit = + decrement(1L) + + def decrement(times: Long): Unit = { + val currentValue = sum.addAndGet(-times) + min.update(-currentValue) + } + + def sample(): Unit = { + val currentValue = { + val value = sum.get() + if (value <= 0) 0 else value + } + + val currentMin = { + val rawMin = min.maxThenReset(-currentValue) + if (rawMin >= 0) + 0 + else + abs(rawMin) + } + + val currentMax = max.maxThenReset(currentValue) + + underlyingHistogram.record(currentValue) + underlyingHistogram.record(currentMin) + underlyingHistogram.record(currentMax) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala index b7cc349e..e8587ffe 100644 --- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -1,18 +1,72 @@ package kamon.metric -import java.time.Instant -import kamon.metric.instrument.{DistributionSnapshot, SingleValueSnapshot} +import kamon.util.MeasurementUnit -case class Interval(from: Instant, to: Instant) +case class Interval(from: Long, to: Long) case class MetricsSnapshot( - histograms: Seq[DistributionSnapshot], - minMaxCounters: Seq[DistributionSnapshot], - gauges: Seq[SingleValueSnapshot], - counters: Seq[SingleValueSnapshot] + histograms: Seq[MetricDistribution], + minMaxCounters: Seq[MetricDistribution], + gauges: Seq[MetricValue], + counters: Seq[MetricValue] ) case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot) + +/** + * Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges. + * + */ +case class MetricValue(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, value: Long) + +/** + * Snapshot for instruments that internally the distribution of values in a defined dynamic range. Meant to be used + * with histograms and min max counters. + */ +case class MetricDistribution(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, + dynamicRange: DynamicRange, distribution: Distribution) + + +trait Distribution { + def buckets: Seq[Bucket] + def bucketsIterator: Iterator[Bucket] + + def min: Long + def max: Long + def sum: Long + def count: Long + def percentile(p: Double): Percentile + + def percentiles: Seq[Percentile] + def percentilesIterator: Iterator[Percentile] +} + +trait Bucket { + def value: Long + def frequency: Long +} + +trait Percentile { + def quantile: Double + def value: Long + def countUnderQuantile: Long +} + + +trait DistributionSnapshotInstrument { + private[kamon] def snapshot(): MetricDistribution +} + +trait SingleValueSnapshotInstrument { + private[kamon] def snapshot(): MetricValue +} + +trait SnapshotableHistogram extends Histogram with DistributionSnapshotInstrument +trait SnapshotableMinMaxCounter extends MinMaxCounter with DistributionSnapshotInstrument +trait SnapshotableCounter extends Counter with SingleValueSnapshotInstrument +trait SnapshotableGauge extends Gauge with SingleValueSnapshotInstrument + + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala deleted file mode 100644 index f18e771c..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ /dev/null @@ -1,34 +0,0 @@ -package kamon -package metric -package instrument - -import java.util.concurrent.atomic.LongAdder - -import com.typesafe.scalalogging.StrictLogging -import kamon.util.MeasurementUnit - -trait Counter { - def measurementUnit: MeasurementUnit - - def increment(): Unit - def increment(times: Long): Unit -} - -class LongAdderCounter(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit) - extends SnapshotableCounter with StrictLogging { - - private val adder = new LongAdder() - - def increment(): Unit = - adder.increment() - - def increment(times: Long): Unit = { - if (times >= 0) - adder.add(times) - else - logger.warn(s"Ignored attempt to decrement counter [$name]") - } - - def snapshot(): SingleValueSnapshot = - SingleValueSnapshot(name, tags, measurementUnit, adder.sumThenReset()) -} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala b/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala deleted file mode 100644 index 226f5450..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala +++ /dev/null @@ -1,33 +0,0 @@ -package kamon.metric.instrument - -import java.util.concurrent.TimeUnit - -case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, significantValueDigits: Int) { - def upTo(highestTrackableValue: Long): DynamicRange = - copy(highestTrackableValue = highestTrackableValue) - - def startingFrom(lowestDiscernibleValue: Long): DynamicRange = - copy(lowestDiscernibleValue = lowestDiscernibleValue) -} - -object DynamicRange { - private val oneHourInNanoseconds = TimeUnit.HOURS.toNanos(1) - - /** - * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 1 significant digit (10%) - * across that range. - */ - val Loose = DynamicRange(1L, oneHourInNanoseconds, 1) - - /** - * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 2 significant digit (1%) - * across that range. - */ - val Default = DynamicRange(1L, oneHourInNanoseconds, 2) - - /** - * Provides a range from 0 to 3.6e+12 (one hour in nanoseconds) with a value precision of 3 significant digit (0.1%) - * across that range. - */ - val Fine = DynamicRange(1L, oneHourInNanoseconds, 3) -} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala deleted file mode 100644 index acbff912..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ /dev/null @@ -1,39 +0,0 @@ -package kamon.metric.instrument - -import java.util.concurrent.atomic.AtomicLong -import kamon.util.MeasurementUnit - -trait Gauge { - def measurementUnit: MeasurementUnit - - def increment(): Unit - def increment(times: Long): Unit - def decrement(): Unit - def decrement(times: Long): Unit - def set(value: Long): Unit -} - - -class AtomicLongGauge(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit) - extends SnapshotableGauge { - - private val currentValue = new AtomicLong(0L) - - def increment(): Unit = - currentValue.incrementAndGet() - - def increment(times: Long): Unit = - currentValue.addAndGet(times) - - def decrement(): Unit = - currentValue.decrementAndGet() - - def decrement(times: Long): Unit = - currentValue.addAndGet(-times) - - def set(value: Long): Unit = - currentValue.set(value) - - def snapshot(): SingleValueSnapshot = - SingleValueSnapshot(name, tags, measurementUnit, currentValue.get()) -} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala deleted file mode 100644 index 29fe8c69..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ /dev/null @@ -1,193 +0,0 @@ -package kamon -package metric -package instrument - -import java.nio.ByteBuffer - -import com.typesafe.scalalogging.StrictLogging -import kamon.util.MeasurementUnit -import org.HdrHistogram.{AtomicHistogramExtension, ZigZag} - -trait Histogram { - def dynamicRange: DynamicRange - def measurementUnit: MeasurementUnit - - def record(value: Long): Unit - def record(value: Long, times: Long): Unit -} - - -class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit, val dynamicRange: DynamicRange) - extends AtomicHistogramExtension(dynamicRange) with SnapshotableHistogram with StrictLogging { - - def record(value: Long): Unit = - tryRecord(value, 1) - - def record(value: Long, count: Long): Unit = - tryRecord(value, count) - - private def tryRecord(value: Long, count: Long): Unit = { - try { - recordValueWithCount(value, count) - } catch { - case anyException: Throwable ⇒ - logger.warn(s"Failed to store value [$value] in histogram [$name]. You might need to change " + - "your dynamic range configuration for this instrument.", anyException) - } - } - - override def snapshot(): DistributionSnapshot = { - val buffer = HdrHistogram.tempSnapshotBuffer.get() - val counts = countsArray() - val countsLimit = counts.length() - var index = 0 - buffer.clear() - - var minIndex = Int.MaxValue - var maxIndex = 0 - var totalCount = 0L - - while(index < countsLimit) { - val countAtIndex = counts.getAndSet(index, 0L) - - var zerosCount = 0L - if(countAtIndex == 0L) { - index += 1 - zerosCount = 1 - while(index < countsLimit && counts.get(index) == 0L) { - index += 1 - zerosCount += 1 - } - } - - if(zerosCount > 0) { - if(index < countsLimit) - ZigZag.putLong(buffer, -zerosCount) - } - else { - if(minIndex > index) - minIndex = index - maxIndex = index - - index += 1 - totalCount += countAtIndex - ZigZag.putLong(buffer, countAtIndex) - } - } - - buffer.flip() - val zigZagCounts = Array.ofDim[Byte](buffer.limit()) - buffer.get(zigZagCounts) - - val distribution = new ZigZagCountsDistribution(totalCount, minIndex, maxIndex, ByteBuffer.wrap(zigZagCounts), - protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) - - DistributionSnapshot(name, tags, measurementUnit, dynamicRange, distribution) - } - - private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer, - unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Distribution { - - val min: Long = if(count == 0) 0 else bucketValueAtIndex(minIndex) - val max: Long = bucketValueAtIndex(maxIndex) - def sum: Long = bucketsIterator.foldLeft(0L)((a, b) => a + (b.value * b.frequency)) - - def buckets: Seq[Bucket] = { - val builder = Vector.newBuilder[Bucket] - bucketsIterator.foreach { b => - builder += DefaultBucket(b.value, b.frequency) - } - - builder.result() - } - - def bucketsIterator: Iterator[Bucket] = new Iterator[Bucket] { - val buffer = zigZagCounts.duplicate() - val bucket = MutableBucket(0, 0) - var countsArrayIndex = 0 - - def hasNext: Boolean = - buffer.remaining() > 0 - - def next(): Bucket = { - val readLong = ZigZag.getLong(buffer) - val frequency = if(readLong > 0) { - readLong - } else { - countsArrayIndex += (-readLong.toInt) - ZigZag.getLong(buffer) - } - - bucket.value = bucketValueAtIndex(countsArrayIndex) - bucket.frequency = frequency - countsArrayIndex += 1 - bucket - } - } - - def percentilesIterator: Iterator[Percentile] = new Iterator[Percentile]{ - val buckets = bucketsIterator - val percentile = MutablePercentile(0D, 0, 0) - var countUnderQuantile = 0L - - def hasNext: Boolean = - buckets.hasNext - - def next(): Percentile = { - val bucket = buckets.next() - countUnderQuantile += bucket.frequency - percentile.quantile = (countUnderQuantile * 100D) / ZigZagCountsDistribution.this.count - percentile.countUnderQuantile = countUnderQuantile - percentile.value = bucket.value - percentile - } - } - - def percentile(p: Double): Percentile = { - val percentiles = percentilesIterator - if(percentiles.hasNext) { - var currentPercentile = percentiles.next() - while(percentiles.hasNext && currentPercentile.quantile < p) { - currentPercentile = percentiles.next() - } - - currentPercentile - - } else DefaultPercentile(p, 0, 0) - } - - - def percentiles: Seq[Percentile] = { - val builder = Vector.newBuilder[Percentile] - percentilesIterator.foreach { p => - builder += DefaultPercentile(p.quantile, p.value, p.countUnderQuantile) - } - - builder.result() - } - - @inline private def bucketValueAtIndex(index: Int): Long = { - var bucketIndex: Int = (index >> subBucketHalfCountMagnitude) - 1 - var subBucketIndex: Int = (index & (subBucketHalfCount - 1)) + subBucketHalfCount - if (bucketIndex < 0) { - subBucketIndex -= subBucketHalfCount - bucketIndex = 0 - } - - subBucketIndex.toLong << (bucketIndex + unitMagnitude) - } - } - - case class DefaultBucket(value: Long, frequency: Long) extends Bucket - case class MutableBucket(var value: Long, var frequency: Long) extends Bucket - - case class DefaultPercentile(quantile: Double, value: Long, countUnderQuantile: Long) extends Percentile - case class MutablePercentile(var quantile: Double, var value: Long, var countUnderQuantile: Long) extends Percentile -} - -object HdrHistogram { - // TODO: move this to some object pool might be better, or at - private val tempSnapshotBuffer = new ThreadLocal[ByteBuffer] { - override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792) - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala deleted file mode 100644 index dc3cad08..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala +++ /dev/null @@ -1,39 +0,0 @@ -package org.HdrHistogram - -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicLongArray - -import kamon.metric.instrument.DynamicRange - -/** - * Exposes package-private members of [[org.HdrHistogram.AtomicHistogram]]. - */ -abstract class AtomicHistogramExtension(dr: DynamicRange) - extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) { - - override def incrementTotalCount(): Unit = {} - override def addToTotalCount(value: Long): Unit = {} - - def countsArray(): AtomicLongArray = counts - def protectedUnitMagnitude(): Int = unitMagnitude - def protectedSubBucketHalfCount(): Int = subBucketHalfCount - def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude -} - -/** - * Exposes the package-private members of [[org.HdrHistogram.ZigZagEncoding]]. - */ -object ZigZag { - def putLong(buffer: ByteBuffer, value: Long): Unit = - ZigZagEncoding.putLong(buffer, value) - - def getLong(buffer: ByteBuffer): Long = - ZigZagEncoding.getLong(buffer) - - def putInt(buffer: ByteBuffer, value: Int): Unit = - ZigZagEncoding.putInt(buffer, value) - - def getInt(buffer: ByteBuffer): Int = - ZigZagEncoding.getInt(buffer) -} - diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala deleted file mode 100644 index 0e0536c6..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala +++ /dev/null @@ -1,98 +0,0 @@ -package kamon -package metric -package instrument - -import java.time.Duration - -import com.typesafe.config.Config -import kamon.metric.instrument.InstrumentFactory.CustomInstrumentSettings -import kamon.util.MeasurementUnit - - -private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: DynamicRange, defaultMMCounterDynamicRange: DynamicRange, - defaultMMCounterSampleInterval: Duration, customSettings: Map[String, CustomInstrumentSettings]) { - - println("DEFAULT: " + defaultHistogramDynamicRange) - - def buildHistogram(dynamicRange: Option[DynamicRange])(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableHistogram = - new HdrHistogram(name, tags, unit, instrumentDynamicRange(name, dynamicRange.getOrElse(defaultHistogramDynamicRange))) - - def buildMinMaxCounter(dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]) - (name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableMinMaxCounter = - new PaddedMinMaxCounter( - name, - tags, - buildHistogram(dynamicRange.orElse(Some(defaultMMCounterDynamicRange)))(name, tags, unit), - instrumentSampleInterval(name, sampleInterval.getOrElse(defaultMMCounterSampleInterval)) ) - - def buildGauge(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableGauge = - new AtomicLongGauge(name, tags, unit) - - def buildCounter(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableCounter = - new LongAdderCounter(name, tags, unit) - - - private def instrumentDynamicRange(instrumentName: String, dynamicRange: DynamicRange): DynamicRange = - customSettings.get(instrumentName).fold(dynamicRange) { cs => - overrideDynamicRange(dynamicRange, cs) - } - - private def instrumentSampleInterval(instrumentName: String, sampleInterval: Duration): Duration = - customSettings.get(instrumentName).fold(sampleInterval) { cs => - cs.sampleInterval.getOrElse(sampleInterval) - } - - private def overrideDynamicRange(defaultDynamicRange: DynamicRange, customSettings: CustomInstrumentSettings): DynamicRange = - DynamicRange( - customSettings.lowestDiscernibleValue.getOrElse(defaultDynamicRange.lowestDiscernibleValue), - customSettings.highestTrackableValue.getOrElse(defaultDynamicRange.highestTrackableValue), - customSettings.significantValueDigits.getOrElse(defaultDynamicRange.significantValueDigits) - ) -} - -object InstrumentFactory { - - def fromConfig(config: Config): InstrumentFactory = { - val factoryConfig = config.getConfig("kamon.metric.instrument-factory") - val histogramDynamicRange = readDynamicRange(factoryConfig.getConfig("default-settings.histogram")) - val mmCounterDynamicRange = readDynamicRange(factoryConfig.getConfig("default-settings.min-max-counter")) - val mmCounterSampleInterval = factoryConfig.getDuration("default-settings.min-max-counter.sample-interval") - - val customSettings = factoryConfig.getConfig("custom-settings") - .configurations - .filter(nonEmptySection) - .map(readCustomInstrumentSettings) - - new InstrumentFactory(histogramDynamicRange, mmCounterDynamicRange, mmCounterSampleInterval, customSettings) - } - - private def nonEmptySection(entry: (String, Config)): Boolean = entry match { - case (_, config) => config.firstLevelKeys.nonEmpty - } - - private def readCustomInstrumentSettings(entry: (String, Config)): (String, CustomInstrumentSettings) = { - val (metricName, metricConfig) = entry - val customSettings = CustomInstrumentSettings( - if (metricConfig.hasPath("lowest-discernible-value")) Some(metricConfig.getLong("lowest-discernible-value")) else None, - if (metricConfig.hasPath("highest-trackable-value")) Some(metricConfig.getLong("highest-trackable-value")) else None, - if (metricConfig.hasPath("significant-value-digits")) Some(metricConfig.getInt("significant-value-digits")) else None, - if (metricConfig.hasPath("sample-interval")) Some(metricConfig.getDuration("sample-interval")) else None - ) - - (metricName -> customSettings) - } - - private def readDynamicRange(config: Config): DynamicRange = - DynamicRange( - lowestDiscernibleValue = config.getLong("lowest-discernible-value"), - highestTrackableValue = config.getLong("highest-trackable-value"), - significantValueDigits = config.getInt("significant-value-digits") - ) - - private case class CustomInstrumentSettings( - lowestDiscernibleValue: Option[Long], - highestTrackableValue: Option[Long], - significantValueDigits: Option[Int], - sampleInterval: Option[Duration] - ) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala deleted file mode 100644 index 1364c2d8..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala +++ /dev/null @@ -1,57 +0,0 @@ -package kamon.metric.instrument - -import kamon.util.MeasurementUnit - -/** - * Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges. - * - */ -case class SingleValueSnapshot(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, value: Long) - -/** - * Snapshot for instruments that internally the distribution of values in a defined dynamic range. Meant to be used - * with histograms and min max counters. - */ -case class DistributionSnapshot(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, - dynamicRange: DynamicRange, distribution: Distribution) - - -trait Distribution { - def buckets: Seq[Bucket] - def bucketsIterator: Iterator[Bucket] - - def min: Long - def max: Long - def sum: Long - def count: Long - def percentile(p: Double): Percentile - - def percentiles: Seq[Percentile] - def percentilesIterator: Iterator[Percentile] -} - -trait Bucket { - def value: Long - def frequency: Long -} - -trait Percentile { - def quantile: Double - def value: Long - def countUnderQuantile: Long -} - - -trait DistributionSnapshotInstrument { - private[kamon] def snapshot(): DistributionSnapshot -} - -trait SingleValueSnapshotInstrument { - private[kamon] def snapshot(): SingleValueSnapshot -} - -trait SnapshotableHistogram extends Histogram with DistributionSnapshotInstrument -trait SnapshotableMinMaxCounter extends MinMaxCounter with DistributionSnapshotInstrument -trait SnapshotableCounter extends Counter with SingleValueSnapshotInstrument -trait SnapshotableGauge extends Gauge with SingleValueSnapshotInstrument - diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala deleted file mode 100644 index 70094b7b..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ /dev/null @@ -1,75 +0,0 @@ -package kamon.metric.instrument - -import java.lang.Math.abs -import java.time.Duration -import java.util.concurrent.atomic.{AtomicLong, AtomicReference} - -import kamon.jsr166.LongMaxUpdater -import kamon.util.MeasurementUnit - -trait MinMaxCounter { - def dynamicRange: DynamicRange - def sampleInterval: Duration - def measurementUnit: MeasurementUnit - - def increment(): Unit - def increment(times: Long): Unit - def decrement(): Unit - def decrement(times: Long): Unit - def sample(): Unit -} - - -class PaddedMinMaxCounter(name: String, tags: Map[String, String], underlyingHistogram: Histogram with DistributionSnapshotInstrument, - val sampleInterval: Duration) extends SnapshotableMinMaxCounter { - - private val min = new LongMaxUpdater(0L) - private val max = new LongMaxUpdater(0L) - private val sum = new AtomicLong() - - def dynamicRange: DynamicRange = - underlyingHistogram.dynamicRange - - def measurementUnit: MeasurementUnit = - underlyingHistogram.measurementUnit - - private[kamon] def snapshot(): DistributionSnapshot = - underlyingHistogram.snapshot() - - def increment(): Unit = - increment(1L) - - def increment(times: Long): Unit = { - val currentValue = sum.addAndGet(times) - max.update(currentValue) - } - - def decrement(): Unit = - decrement(1L) - - def decrement(times: Long): Unit = { - val currentValue = sum.addAndGet(-times) - min.update(-currentValue) - } - - def sample(): Unit = { - val currentValue = { - val value = sum.get() - if (value <= 0) 0 else value - } - - val currentMin = { - val rawMin = min.maxThenReset(-currentValue) - if (rawMin >= 0) - 0 - else - abs(rawMin) - } - - val currentMax = max.maxThenReset(currentValue) - - underlyingHistogram.record(currentValue) - underlyingHistogram.record(currentMin) - underlyingHistogram.record(currentMax) - } -} \ No newline at end of file -- cgit v1.2.3