diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala | 331 |
1 files changed, 0 insertions, 331 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala deleted file mode 100644 index 399f0880..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ /dev/null @@ -1,331 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric.instrument - -import java.nio.LongBuffer - -import kamon.metric.instrument.Histogram.{DynamicRange, Snapshot} -import kamon.util.logger.LazyLogger -import org.HdrHistogram.ModifiedAtomicHistogram - -trait Histogram extends Instrument { - type SnapshotType = Histogram.Snapshot - - def record(value: Long): Unit - def record(value: Long, count: Long): Unit -} - -object Histogram { - - /** - * Scala API: - * - * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given - * [[kamon.metric.instrument.Histogram.DynamicRange]]. - */ - def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange) - - /** - * Java API: - * - * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given - * [[kamon.metric.instrument.Histogram.DynamicRange]]. - */ - def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange) - - /** - * DynamicRange is a configuration object used to supply range and precision configuration to a - * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]] - * for more details on how it works and the effects of these configuration values. - * - * @param lowestDiscernibleValue - * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that - * is >= 1. May be internally rounded down to nearest power of 2. - * @param highestTrackableValue - * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue). - * Must not be larger than (Long.MAX_VALUE/2). - * @param precision - * The number of significant decimal digits to which the histogram will maintain value resolution and separation. - * Must be a non-negative integer between 1 and 3. - */ - case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int) - - trait Record { - def level: Long - def count: Long - - private[kamon] def rawCompactRecord: Long - } - - case class MutableRecord(var level: Long, var count: Long) extends Record { - var rawCompactRecord: Long = 0L - } - - trait Snapshot extends InstrumentSnapshot { - - def isEmpty: Boolean = numberOfMeasurements == 0 - def numberOfMeasurements: Long - def min: Long - def max: Long - def sum: Long - def percentile(percentile: Double): Long - def recordsIterator: Iterator[Record] - def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot - def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot - - override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = - new ScaledSnapshot(from, to, this) - } - - class ScaledSnapshot(from: UnitOfMeasurement, to: UnitOfMeasurement, snapshot: Snapshot) extends Snapshot { - private def doScale(v: Long) = from.tryScale(to)(v).toLong - override def numberOfMeasurements: Long = snapshot.numberOfMeasurements - - override def max: Long = doScale(snapshot.max) - - override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = snapshot.merge(that, context) - - override def merge(that: Snapshot, context: CollectionContext): Snapshot = snapshot.merge(that, context) - - override def percentile(percentile: Double): Long = doScale(snapshot.percentile(percentile)) - - override def min: Long = doScale(snapshot.min) - - override def sum: Long = doScale(snapshot.sum) - - override def recordsIterator: Iterator[Record] = { - snapshot.recordsIterator.map(record ⇒ new Record { - override def count: Long = record.count - - override def level: Long = doScale(record.level) - - override private[kamon] def rawCompactRecord: Long = record.rawCompactRecord - }) - } - - override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = - if (this.from == from && this.to == to) this else super.scale(from, to) - } - - object Snapshot { - val empty = new Snapshot { - override def min: Long = 0L - override def max: Long = 0L - override def sum: Long = 0L - override def percentile(percentile: Double): Long = 0L - override def recordsIterator: Iterator[Record] = Iterator.empty - override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that - override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that - override def numberOfMeasurements: Long = 0L - override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = this - } - } -} - -object HdrHistogram { - private val log = LazyLogger(classOf[HdrHistogram]) -} - -/** - * This implementation is meant to be used for real time data collection where data snapshots are taken often over time. - * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still - * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. - */ -class HdrHistogram(dynamicRange: DynamicRange) extends ModifiedAtomicHistogram( - dynamicRange.lowestDiscernibleValue, - dynamicRange.highestTrackableValue, dynamicRange.precision -) with Histogram { - import HdrHistogram.log - - def record(value: Long): Unit = tryRecord(value, 1L) - - def record(value: Long, count: Long): Unit = tryRecord(value, count) - - private def tryRecord(value: Long, count: Long): Unit = { - try { - recordValueWithCount(value, count) - } catch { - case anyException: Throwable ⇒ - log.warn(s"Failed to store value $value in HdrHistogram, please review your range configuration.", anyException) - } - } - - def collect(context: CollectionContext): Histogram.Snapshot = { - import context.buffer - buffer.clear() - val nrOfMeasurements = writeSnapshotTo(buffer) - - buffer.flip() - - val measurementsArray = Array.ofDim[Long](buffer.limit()) - buffer.get(measurementsArray, 0, measurementsArray.length) - new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) - } - - def getCounts = countsArray().length() - - def cleanup: Unit = {} - - private def writeSnapshotTo(buffer: LongBuffer): Long = { - val counts = countsArray() - val countsLength = counts.length() - - var nrOfMeasurements = 0L - var index = 0L - while (index < countsLength) { - val countAtIndex = counts.getAndSet(index.toInt, 0L) - - if (countAtIndex > 0) { - buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex)) - nrOfMeasurements += countAtIndex - } - - index += 1 - } - nrOfMeasurements - } -} - -case class CompactHdrSnapshot(numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, - subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { - - def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) - def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1)) - def sum: Long = recordsIterator.foldLeft(0L)((a, r) ⇒ a + (r.count * r.level)) - - def percentile(p: Double): Long = { - val records = recordsIterator - val threshold = numberOfMeasurements * (p / 100D) - var countToCurrentLevel = 0L - var percentileLevel = 0L - - while (countToCurrentLevel < threshold && records.hasNext) { - val record = records.next() - countToCurrentLevel += record.count - percentileLevel = record.level - } - - percentileLevel - } - - def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot = - merge(that.asInstanceOf[InstrumentSnapshot], context) - - def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match { - case thatSnapshot: CompactHdrSnapshot ⇒ - if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else { - import context.buffer - buffer.clear() - - val selfIterator = recordsIterator - val thatIterator = thatSnapshot.recordsIterator - var thatCurrentRecord: Histogram.Record = null - var mergedNumberOfMeasurements = 0L - - def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null - def addToBuffer(compactRecord: Long): Unit = { - mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) - buffer.put(compactRecord) - } - - while (selfIterator.hasNext) { - val selfCurrentRecord = selfIterator.next() - - // Advance that to no further than the level of selfCurrentRecord - thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord - while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { - addToBuffer(thatCurrentRecord.rawCompactRecord) - thatCurrentRecord = nextOrNull(thatIterator) - } - - // Include the current record of self and optionally merge if has the same level as thatCurrentRecord - if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { - addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) - thatCurrentRecord = nextOrNull(thatIterator) - } else { - addToBuffer(selfCurrentRecord.rawCompactRecord) - } - } - - // Include everything that might have been left from that - if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) - while (thatIterator.hasNext) { - addToBuffer(thatIterator.next().rawCompactRecord) - } - - buffer.flip() - val compactRecords = Array.ofDim[Long](buffer.limit()) - buffer.get(compactRecords) - - new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) - } - - case other ⇒ - sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.") - - } - - @inline private def mergeCompactRecords(left: Long, right: Long): Long = { - val index = left >> 48 - val leftCount = countFromCompactRecord(left) - val rightCount = countFromCompactRecord(right) - - CompactHdrSnapshot.compactRecord(index, leftCount + rightCount) - } - - @inline private def levelFromCompactRecord(compactRecord: Long): Long = { - val countsArrayIndex = (compactRecord >> 48).toInt - var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1 - var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount - if (bucketIndex < 0) { - subBucketIndex -= subBucketHalfCount - bucketIndex = 0 - } - - subBucketIndex.toLong << (bucketIndex + unitMagnitude) - } - - @inline private def countFromCompactRecord(compactRecord: Long): Long = - compactRecord & CompactHdrSnapshot.CompactRecordCountMask - - def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] { - var currentIndex = 0 - val mutableRecord = Histogram.MutableRecord(0, 0) - - override def hasNext: Boolean = currentIndex < compactRecords.length - - override def next(): Histogram.Record = { - if (hasNext) { - val measurement = compactRecords(currentIndex) - mutableRecord.rawCompactRecord = measurement - mutableRecord.level = levelFromCompactRecord(measurement) - mutableRecord.count = countFromCompactRecord(measurement) - currentIndex += 1 - - mutableRecord - } else { - throw new IllegalStateException("The iterator has already been consumed.") - } - } - } -} - -object CompactHdrSnapshot { - val CompactRecordCountMask = 0xFFFFFFFFFFFFL - - def compactRecord(index: Long, count: Long): Long = (index << 48) | count -} |