path: root/kamon-core/src/main/scala/kamon/metric/instrument
diff options
authorIvan Topolnjak <ivantopo@gmail.com>2014-07-03 14:36:42 -0300
committerIvan Topolnjak <ivantopo@gmail.com>2014-07-03 14:36:18 -0300
commita3353d3e3fcb1dfab3e8f401187e236e99df2202 (patch)
tree4e9e246201cf169f1496bc72928ea2d35d03fcd0 /kamon-core/src/main/scala/kamon/metric/instrument
parent6d7970c6dd5b96b512c846181771bb11a43bc82a (diff)
! all: refactor the core metric recording instruments and accomodate UserMetrics
This PR is including several changes to the kamon-core, most notably: - Formalize the interface for Histograms, Counters and MinMaxCounters. Making sure that the interfaces are as clean as possible. - Move away from the all Vector[Measurement] based Histogram snapshot to a new approach in which we use a single long to store both the index in the counts array and the frequency on that bucket. The leftmost 2 bytes of each long are used for storing the counts array index and the remaining 6 bytes are used for the actual count, and everything is put into a simple long array. This way only the buckets that actually have values will be included in the snapshot with the smallest possible memory footprint. - Introduce Gauges. - Reorganize the instrumentation for Akka and Scala and rewrite most of the tests of this components to avoid going through the subscription protocol to test. - Introduce trace tests and fixes on various tests. - Necessary changes on new relic, datadog and statsd modules to compile with the new codebase. Pending: - Finish the upgrade of the new relic to the current model. - Introduce proper limit checks for histograms to ensure that we never pass the 2/6 bytes limits. - More testing, more testing, more testing. - Create the KamonStandalone module.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument')
4 files changed, 499 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
new file mode 100644
index 00000000..b592bcd3
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
@@ -0,0 +1,59 @@
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package kamon.metric.instrument
+import jsr166e.LongAdder
+import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder }
+trait Counter extends MetricRecorder {
+ type SnapshotType = Counter.Snapshot
+ def increment(): Unit
+ def increment(times: Long): Unit
+object Counter {
+ def apply(): Counter = new LongAdderCounter
+ trait Snapshot extends MetricSnapshot {
+ type SnapshotType = Counter.Snapshot
+ def count: Long
+ def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot
+ }
+class LongAdderCounter extends Counter {
+ private val counter = new LongAdder
+ def increment(): Unit = counter.increment()
+ def increment(times: Long): Unit = {
+ if (times < 0)
+ throw new UnsupportedOperationException("Counters cannot be decremented")
+ counter.add(times)
+ }
+ def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumThenReset())
+ def cleanup: Unit = {}
+case class CounterSnapshot(count: Long) extends Counter.Snapshot {
+ def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count)
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
new file mode 100644
index 00000000..1efff2bc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
@@ -0,0 +1,78 @@
+package kamon.metric.instrument
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+import akka.actor.{ Cancellable, ActorSystem }
+import com.typesafe.config.Config
+import kamon.metric.{ CollectionContext, Scale, MetricRecorder }
+import scala.concurrent.duration.FiniteDuration
+trait Gauge extends MetricRecorder {
+ type SnapshotType = Histogram.Snapshot
+ def record(value: Long)
+ def record(value: Long, count: Long)
+object Gauge {
+ trait CurrentValueCollector {
+ def currentValue: Long
+ }
+ def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration,
+ system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
+ val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector)
+ val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ gauge.refreshValue()
+ }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+ gauge.refreshValuesSchedule.set(refreshValuesSchedule)
+ gauge
+ }
+ def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge =
+ fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction))
+ def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = {
+ val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision")
+ fromConfig(config, system)(currentValueCollector)
+ }
+ def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
+ import scala.concurrent.duration._
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+ val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+ Gauge(Histogram.Precision(significantDigits), highest, Scale.Unit, refreshInterval.millis, system)(currentValueCollector)
+ }
+ implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f.apply()
+ }
+class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge {
+ val refreshValuesSchedule = new AtomicReference[Cancellable]()
+ def record(value: Long): Unit = underlyingHistogram.record(value)
+ def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count)
+ def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context)
+ def cleanup: Unit = {
+ if (refreshValuesSchedule.get() != null)
+ refreshValuesSchedule.get().cancel()
+ }
+ def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue)
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
new file mode 100644
index 00000000..9ae077f4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -0,0 +1,246 @@
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+package kamon.metric.instrument
+import java.nio.LongBuffer
+import com.typesafe.config.Config
+import org.HdrHistogram.AtomicHistogramFieldsAccessor
+import org.HdrHistogram.AtomicHistogram
+import kamon.metric._
+trait Histogram extends MetricRecorder {
+ type SnapshotType = Histogram.Snapshot
+ def record(value: Long)
+ def record(value: Long, count: Long)
+object Histogram {
+ def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram =
+ new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale)
+ def fromConfig(config: Config): Histogram = {
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+ new HdrHistogram(1L, highest, significantDigits)
+ }
+ object HighestTrackableValue {
+ val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L
+ }
+ case class Precision(significantDigits: Int)
+ object Precision {
+ val Low = Precision(1)
+ val Normal = Precision(2)
+ val Fine = Precision(3)
+ }
+ trait Record {
+ def level: Long
+ def count: Long
+ private[kamon] def rawCompactRecord: Long
+ }
+ case class MutableRecord(var level: Long, var count: Long) extends Record {
+ var rawCompactRecord: Long = 0L
+ }
+ trait Snapshot extends MetricSnapshot {
+ type SnapshotType = Histogram.Snapshot
+ def isEmpty: Boolean = numberOfMeasurements == 0
+ def scale: Scale
+ def numberOfMeasurements: Long
+ def min: Long
+ def max: Long
+ def recordsIterator: Iterator[Record]
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot
+ }
+ * This implementation is meant to be used for real time data collection where data snapshots are taken often over time.
+ * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still
+ * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
+ */
+class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit)
+ extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits)
+ with Histogram with AtomicHistogramFieldsAccessor {
+ import AtomicHistogramFieldsAccessor.totalCountUpdater
+ def record(value: Long): Unit = recordValue(value)
+ def record(value: Long, count: Long): Unit = recordValueWithCount(value, count)
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ import context.buffer
+ buffer.clear()
+ val nrOfMeasurements = writeSnapshotTo(buffer)
+ buffer.flip()
+ val measurementsArray = Array.ofDim[Long](buffer.limit())
+ buffer.get(measurementsArray, 0, measurementsArray.length)
+ new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
+ }
+ def cleanup: Unit = {}
+ private def writeSnapshotTo(buffer: LongBuffer): Long = {
+ val counts = countsArray()
+ val countsLength = counts.length()
+ var nrOfMeasurements = 0L
+ var index = 0L
+ while (index < countsLength) {
+ val countAtIndex = counts.getAndSet(index.toInt, 0L)
+ if (countAtIndex > 0) {
+ buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex))
+ nrOfMeasurements += countAtIndex
+ }
+ index += 1
+ }
+ reestablishTotalCount(nrOfMeasurements)
+ nrOfMeasurements
+ }
+ private def reestablishTotalCount(diff: Long): Unit = {
+ def tryUpdateTotalCount: Boolean = {
+ val previousTotalCount = getTotalCount
+ val newTotalCount = previousTotalCount - diff
+ totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount)
+ }
+ while (!tryUpdateTotalCount) {}
+ }
+class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
+ subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot {
+ def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0))
+ def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1))
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = {
+ if (that.isEmpty) this else if (this.isEmpty) that else {
+ import context.buffer
+ buffer.clear()
+ val selfIterator = recordsIterator
+ val thatIterator = that.recordsIterator
+ var thatCurrentRecord: Histogram.Record = null
+ var mergedNumberOfMeasurements = 0L
+ def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
+ def addToBuffer(compactRecord: Long): Unit = {
+ mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
+ buffer.put(compactRecord)
+ }
+ while (selfIterator.hasNext) {
+ val selfCurrentRecord = selfIterator.next()
+ // Advance that to no further than the level of selfCurrentRecord
+ thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
+ while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
+ addToBuffer(thatCurrentRecord.rawCompactRecord)
+ thatCurrentRecord = nextOrNull(thatIterator)
+ }
+ // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
+ if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
+ addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
+ thatCurrentRecord = nextOrNull(thatIterator)
+ } else {
+ addToBuffer(selfCurrentRecord.rawCompactRecord)
+ }
+ }
+ // Include everything that might have been left from that
+ if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
+ while (thatIterator.hasNext) {
+ addToBuffer(thatIterator.next().rawCompactRecord)
+ }
+ buffer.flip()
+ val compactRecords = Array.ofDim[Long](buffer.limit())
+ buffer.get(compactRecords)
+ new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
+ }
+ }
+ @inline private def mergeCompactRecords(left: Long, right: Long): Long = {
+ val index = left >> 48
+ val leftCount = countFromCompactRecord(left)
+ val rightCount = countFromCompactRecord(right)
+ CompactHdrSnapshot.compactRecord(index, leftCount + rightCount)
+ }
+ @inline private def levelFromCompactRecord(compactRecord: Long): Long = {
+ val countsArrayIndex = (compactRecord >> 48).toInt
+ var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1
+ var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount
+ if (bucketIndex < 0) {
+ subBucketIndex -= subBucketHalfCount
+ bucketIndex = 0
+ }
+ subBucketIndex.toLong << (bucketIndex + unitMagnitude)
+ }
+ @inline private def countFromCompactRecord(compactRecord: Long): Long =
+ compactRecord & CompactHdrSnapshot.CompactRecordCountMask
+ def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] {
+ var currentIndex = 0
+ val mutableRecord = Histogram.MutableRecord(0, 0)
+ override def hasNext: Boolean = currentIndex < compactRecords.length
+ override def next(): Histogram.Record = {
+ if (hasNext) {
+ val measurement = compactRecords(currentIndex)
+ mutableRecord.rawCompactRecord = measurement
+ mutableRecord.level = levelFromCompactRecord(measurement)
+ mutableRecord.count = countFromCompactRecord(measurement)
+ currentIndex += 1
+ mutableRecord
+ } else {
+ throw new IllegalStateException("The iterator has already been consumed.")
+ }
+ }
+ }
+object CompactHdrSnapshot {
+ val CompactRecordCountMask = 0xFFFFFFFFFFFFL
+ def compactRecord(index: Long, count: Long): Long = (index << 48) | count
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
new file mode 100644
index 00000000..471e7bd4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala
@@ -0,0 +1,116 @@
+package kamon.metric.instrument
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+import java.lang.Math.abs
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+import akka.actor.{ ActorSystem, Cancellable }
+import com.typesafe.config.Config
+import jsr166e.LongMaxUpdater
+import kamon.metric.{ Scale, MetricRecorder, CollectionContext }
+import kamon.util.PaddedAtomicLong
+import scala.concurrent.duration.FiniteDuration
+trait MinMaxCounter extends MetricRecorder {
+ override type SnapshotType = Histogram.Snapshot
+ def increment(): Unit
+ def increment(times: Long): Unit
+ def decrement()
+ def decrement(times: Long)
+object MinMaxCounter {
+ def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration,
+ system: ActorSystem): MinMaxCounter = {
+ val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
+ val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram)
+ val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ minMaxCounter.refreshValues()
+ }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+ minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule)
+ minMaxCounter
+ }
+ def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = {
+ import scala.concurrent.duration._
+ val highest = config.getLong("highest-trackable-value")
+ val significantDigits = config.getInt("significant-value-digits")
+ val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+ apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system)
+ }
+class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter {
+ private val min = new LongMaxUpdater
+ private val max = new LongMaxUpdater
+ private val sum = new PaddedAtomicLong
+ val refreshValuesSchedule = new AtomicReference[Cancellable]()
+ min.update(0L)
+ max.update(0L)
+ def increment(): Unit = increment(1L)
+ def increment(times: Long): Unit = {
+ val currentValue = sum.addAndGet(times)
+ max.update(currentValue)
+ }
+ def decrement(): Unit = decrement(1L)
+ def decrement(times: Long): Unit = {
+ val currentValue = sum.addAndGet(-times)
+ min.update(-currentValue)
+ }
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ refreshValues()
+ underlyingHistogram.collect(context)
+ }
+ def cleanup: Unit = {
+ if (refreshValuesSchedule.get() != null)
+ refreshValuesSchedule.get().cancel()
+ }
+ def refreshValues(): Unit = {
+ val currentValue = {
+ val value = sum.get()
+ if (value < 0) 0 else value
+ }
+ val currentMin = {
+ val minAbs = abs(min.maxThenReset())
+ if (minAbs <= currentValue) minAbs else 0
+ }
+ underlyingHistogram.record(currentValue)
+ underlyingHistogram.record(currentMin)
+ underlyingHistogram.record(max.maxThenReset())
+ max.update(currentValue)
+ min.update(-currentValue)
+ }