diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/Histogram.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/Histogram.scala | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/Histogram.scala new file mode 100644 index 00000000..47b2a1a0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Histogram.scala @@ -0,0 +1,191 @@ +package kamon.metric + +import java.nio.ByteBuffer + +import com.typesafe.scalalogging.StrictLogging +import kamon.util.MeasurementUnit +import org.HdrHistogram.{AtomicHistogramExtension, ZigZag} + +trait Histogram { + def dynamicRange: DynamicRange + def measurementUnit: MeasurementUnit + + def record(value: Long): Unit + def record(value: Long, times: Long): Unit +} + + +class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit, val dynamicRange: DynamicRange) + extends AtomicHistogramExtension(dynamicRange) with SnapshotableHistogram with StrictLogging { + + def record(value: Long): Unit = + tryRecord(value, 1) + + def record(value: Long, count: Long): Unit = + tryRecord(value, count) + + private def tryRecord(value: Long, count: Long): Unit = { + try { + recordValueWithCount(value, count) + } catch { + case anyException: Throwable ⇒ + logger.warn(s"Failed to store value [$value] in histogram [$name]. You might need to change " + + "your dynamic range configuration for this instrument.", anyException) + } + } + + override def snapshot(): MetricDistribution = { + val buffer = HdrHistogram.tempSnapshotBuffer.get() + val counts = countsArray() + val countsLimit = counts.length() + var index = 0 + buffer.clear() + + var minIndex = Int.MaxValue + var maxIndex = 0 + var totalCount = 0L + + while(index < countsLimit) { + val countAtIndex = counts.getAndSet(index, 0L) + + var zerosCount = 0L + if(countAtIndex == 0L) { + index += 1 + zerosCount = 1 + while(index < countsLimit && counts.get(index) == 0L) { + index += 1 + zerosCount += 1 + } + } + + if(zerosCount > 0) { + if(index < countsLimit) + ZigZag.putLong(buffer, -zerosCount) + } + else { + if(minIndex > index) + minIndex = index + maxIndex = index + + index += 1 + totalCount += countAtIndex + ZigZag.putLong(buffer, countAtIndex) + } + } + + buffer.flip() + val zigZagCounts = Array.ofDim[Byte](buffer.limit()) + buffer.get(zigZagCounts) + + val distribution = new ZigZagCountsDistribution(totalCount, minIndex, maxIndex, ByteBuffer.wrap(zigZagCounts), + protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) + + MetricDistribution(name, tags, measurementUnit, dynamicRange, distribution) + } + + private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer, + unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Distribution { + + val min: Long = if(count == 0) 0 else bucketValueAtIndex(minIndex) + val max: Long = bucketValueAtIndex(maxIndex) + def sum: Long = bucketsIterator.foldLeft(0L)((a, b) => a + (b.value * b.frequency)) + + def buckets: Seq[Bucket] = { + val builder = Vector.newBuilder[Bucket] + bucketsIterator.foreach { b => + builder += DefaultBucket(b.value, b.frequency) + } + + builder.result() + } + + def bucketsIterator: Iterator[Bucket] = new Iterator[Bucket] { + val buffer = zigZagCounts.duplicate() + val bucket = MutableBucket(0, 0) + var countsArrayIndex = 0 + + def hasNext: Boolean = + buffer.remaining() > 0 + + def next(): Bucket = { + val readLong = ZigZag.getLong(buffer) + val frequency = if(readLong > 0) { + readLong + } else { + countsArrayIndex += (-readLong.toInt) + ZigZag.getLong(buffer) + } + + bucket.value = bucketValueAtIndex(countsArrayIndex) + bucket.frequency = frequency + countsArrayIndex += 1 + bucket + } + } + + def percentilesIterator: Iterator[Percentile] = new Iterator[Percentile]{ + val buckets = bucketsIterator + val percentile = MutablePercentile(0D, 0, 0) + var countUnderQuantile = 0L + + def hasNext: Boolean = + buckets.hasNext + + def next(): Percentile = { + val bucket = buckets.next() + countUnderQuantile += bucket.frequency + percentile.quantile = (countUnderQuantile * 100D) / ZigZagCountsDistribution.this.count + percentile.countUnderQuantile = countUnderQuantile + percentile.value = bucket.value + percentile + } + } + + def percentile(p: Double): Percentile = { + val percentiles = percentilesIterator + if(percentiles.hasNext) { + var currentPercentile = percentiles.next() + while(percentiles.hasNext && currentPercentile.quantile < p) { + currentPercentile = percentiles.next() + } + + currentPercentile + + } else DefaultPercentile(p, 0, 0) + } + + + def percentiles: Seq[Percentile] = { + val builder = Vector.newBuilder[Percentile] + percentilesIterator.foreach { p => + builder += DefaultPercentile(p.quantile, p.value, p.countUnderQuantile) + } + + builder.result() + } + + @inline private def bucketValueAtIndex(index: Int): Long = { + var bucketIndex: Int = (index >> subBucketHalfCountMagnitude) - 1 + var subBucketIndex: Int = (index & (subBucketHalfCount - 1)) + subBucketHalfCount + if (bucketIndex < 0) { + subBucketIndex -= subBucketHalfCount + bucketIndex = 0 + } + + subBucketIndex.toLong << (bucketIndex + unitMagnitude) + } + } + + case class DefaultBucket(value: Long, frequency: Long) extends Bucket + case class MutableBucket(var value: Long, var frequency: Long) extends Bucket + + case class DefaultPercentile(quantile: Double, value: Long, countUnderQuantile: Long) extends Percentile + case class MutablePercentile(var quantile: Double, var value: Long, var countUnderQuantile: Long) extends Percentile +} + +object HdrHistogram { + // TODO: move this to some object pool might be better, or at + private val tempSnapshotBuffer = new ThreadLocal[ByteBuffer] { + override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792) + } +}
\ No newline at end of file |