aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric/Histogram.scala
blob: 1af55479b4b17a7880d4032d9c8e8a157ef616e7 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15














                                                                                             

              
 

                          

                                                          
                              

                 
                           





                                            
 

                                                                                                                                     











                                                           

                                                                                                                    


     
                                        












































                                                                                                                  
                                                                    
   


































































































                                                                                                                      
 

                     

                                                                     




                                                                        
/* =========================================================================================
 * Copyright © 2013-2017 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon
package metric

import java.nio.ByteBuffer

import kamon.util.MeasurementUnit
import org.HdrHistogram.{AtomicHistogramExtension, ZigZag}
import org.slf4j.LoggerFactory

trait Histogram {
  def unit: MeasurementUnit
  def dynamicRange: DynamicRange

  def record(value: Long): Unit
  def record(value: Long, times: Long): Unit
}


private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val unit: MeasurementUnit, val dynamicRange: DynamicRange)
    extends AtomicHistogramExtension(dynamicRange) with Histogram {

  def record(value: Long): Unit =
    tryRecord(value, 1)

  def record(value: Long, count: Long): Unit =
    tryRecord(value, count)

  private def tryRecord(value: Long, count: Long): Unit = {
    try {
      recordValueWithCount(value, count)
    } catch {
      case anyException: Throwable 
        HdrHistogram.logger.warn(s"Failed to store value [$value] in histogram [$name]. You might need to change " +
                                  "your dynamic range configuration for this instrument.", anyException)
    }
  }

  def snapshot(): MetricDistribution = {
    val buffer = HdrHistogram.tempSnapshotBuffer.get()
    val counts = countsArray()
    val countsLimit = counts.length()
    var index = 0
    buffer.clear()

    var minIndex = Int.MaxValue
    var maxIndex = 0
    var totalCount = 0L

    while(index < countsLimit) {
      val countAtIndex = counts.getAndSet(index, 0L)

      var zerosCount = 0L
      if(countAtIndex == 0L) {
        index += 1
        zerosCount = 1
        while(index < countsLimit && counts.get(index) == 0L) {
          index += 1
          zerosCount += 1
        }
      }

      if(zerosCount > 0) {
        if(index < countsLimit)
          ZigZag.putLong(buffer, -zerosCount)
      }
      else {
        if(minIndex > index)
          minIndex = index
        maxIndex = index

        index += 1
        totalCount += countAtIndex
        ZigZag.putLong(buffer, countAtIndex)
      }
    }

    buffer.flip()
    val zigZagCounts = Array.ofDim[Byte](buffer.limit())
    buffer.get(zigZagCounts)

    val distribution = new ZigZagCountsDistribution(totalCount, minIndex, maxIndex, ByteBuffer.wrap(zigZagCounts),
      protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude())

    MetricDistribution(name, tags, unit, dynamicRange, distribution)
  }

  private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer,
      unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Distribution {

    val min: Long = if(count == 0) 0 else bucketValueAtIndex(minIndex)
    val max: Long = bucketValueAtIndex(maxIndex)
    def sum: Long = bucketsIterator.foldLeft(0L)((a, b) => a + (b.value * b.frequency))

    def buckets: Seq[Bucket] = {
      val builder = Vector.newBuilder[Bucket]
      bucketsIterator.foreach { b =>
        builder += DefaultBucket(b.value, b.frequency)
      }

      builder.result()
    }

    def bucketsIterator: Iterator[Bucket] = new Iterator[Bucket] {
      val buffer = zigZagCounts.duplicate()
      val bucket = MutableBucket(0, 0)
      var countsArrayIndex = 0

      def hasNext: Boolean =
        buffer.remaining() > 0

      def next(): Bucket = {
        val readLong = ZigZag.getLong(buffer)
        val frequency = if(readLong > 0) {
          readLong
        } else {
          countsArrayIndex += (-readLong.toInt)
          ZigZag.getLong(buffer)
        }

        bucket.value = bucketValueAtIndex(countsArrayIndex)
        bucket.frequency = frequency
        countsArrayIndex += 1
        bucket
      }
    }

    def percentilesIterator: Iterator[Percentile] = new Iterator[Percentile]{
      val buckets = bucketsIterator
      val percentile = MutablePercentile(0D, 0, 0)
      var countUnderQuantile = 0L

      def hasNext: Boolean =
        buckets.hasNext

      def next(): Percentile = {
        val bucket = buckets.next()
        countUnderQuantile += bucket.frequency
        percentile.quantile = (countUnderQuantile * 100D) / ZigZagCountsDistribution.this.count
        percentile.countUnderQuantile = countUnderQuantile
        percentile.value = bucket.value
        percentile
      }
    }

    def percentile(p: Double): Percentile = {
      val percentiles = percentilesIterator
      if(percentiles.hasNext) {
        var currentPercentile = percentiles.next()
        while(percentiles.hasNext && currentPercentile.quantile < p) {
          currentPercentile = percentiles.next()
        }

        currentPercentile

      } else DefaultPercentile(p, 0, 0)
    }


    def percentiles: Seq[Percentile] = {
      val builder = Vector.newBuilder[Percentile]
      percentilesIterator.foreach { p =>
        builder += DefaultPercentile(p.quantile, p.value, p.countUnderQuantile)
      }

      builder.result()
    }

    @inline private def bucketValueAtIndex(index: Int): Long = {
      var bucketIndex: Int = (index >> subBucketHalfCountMagnitude) - 1
      var subBucketIndex: Int = (index & (subBucketHalfCount - 1)) + subBucketHalfCount
      if (bucketIndex < 0) {
        subBucketIndex -= subBucketHalfCount
        bucketIndex = 0
      }

      subBucketIndex.toLong << (bucketIndex + unitMagnitude)
    }
  }

  case class DefaultBucket(value: Long, frequency: Long) extends Bucket
  case class MutableBucket(var value: Long, var frequency: Long) extends Bucket

  case class DefaultPercentile(quantile: Double, value: Long, countUnderQuantile: Long) extends Percentile
  case class MutablePercentile(var quantile: Double, var value: Long, var countUnderQuantile: Long) extends Percentile
}

object HdrHistogram {
  private val logger = LoggerFactory.getLogger(classOf[HdrHistogram])

  // TODO: move this to some object pool might be better, or at
  private val tempSnapshotBuffer = new ThreadLocal[ByteBuffer] {
    override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792)
  }
}