diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument')
9 files changed, 414 insertions, 144 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala new file mode 100644 index 00000000..e79090a8 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala @@ -0,0 +1,35 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package org.HdrHistogram + +import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } + +trait AtomicHistogramFieldsAccessor { + self: AtomicHistogram ⇒ + + def countsArray(): AtomicLongArray = self.counts + + def unitMagnitude(): Int = self.unitMagnitude + + def subBucketHalfCount(): Int = self.subBucketHalfCount + + def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude +} + +object AtomicHistogramFieldsAccessor { + def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index 0f29ba6f..c1b69cbe 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -17,9 +17,8 @@ package kamon.metric.instrument import kamon.jsr166.LongAdder -import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } -trait Counter extends MetricRecorder { +trait Counter extends Instrument { type SnapshotType = Counter.Snapshot def increment(): Unit @@ -29,12 +28,11 @@ trait Counter extends MetricRecorder { object Counter { def apply(): Counter = new LongAdderCounter + def create(): Counter = apply() - trait Snapshot extends MetricSnapshot { - type SnapshotType = Counter.Snapshot - + trait Snapshot extends InstrumentSnapshot { def count: Long - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot } } @@ -55,5 +53,8 @@ class LongAdderCounter extends Counter { } case class CounterSnapshot(count: Long) extends Counter.Snapshot { - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot = that match { + case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount) + case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.") + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index efd7d78f..5d20cdd1 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -1,70 +1,86 @@ package kamon.metric.instrument -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference } -import akka.actor.{ Cancellable, ActorSystem } -import com.typesafe.config.Config -import kamon.metric.{ CollectionContext, Scale, MetricRecorder } +import akka.actor.Cancellable +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange import scala.concurrent.duration.FiniteDuration -trait Gauge extends MetricRecorder { +trait Gauge extends Instrument { type SnapshotType = Histogram.Snapshot - def record(value: Long) - def record(value: Long, count: Long) + def record(value: Long): Unit + def record(value: Long, count: Long): Unit + def refreshValue(): Unit } object Gauge { - trait CurrentValueCollector { - def currentValue: Long - } - - def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) - val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = { + val underlyingHistogram = Histogram(dynamicRange) + val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector) + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { gauge.refreshValue() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) - gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule) gauge } - def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = - fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = + apply(dynamicRange, refreshInterval, scheduler, valueCollector) - def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { - val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") - fromConfig(config, system)(currentValueCollector) + trait CurrentValueCollector { + def currentValue: Long } - def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = { - import scala.concurrent.duration._ + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) +/** + * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between + * the current observed value and the previously observed value. Should only be used if the observed value is always + * increasing or staying steady, but is never able to decrease. + * + * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between + * the current value and the last value will be returned. + */ +class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector { + @volatile private var _readAtLeastOnce = false + private val _lastObservedValue = new AtomicLong(0) + + def currentValue: Long = { + if (_readAtLeastOnce) { + val wrappedCurrent = wrappedValueCollector.currentValue + val diff = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent) + + if (diff >= 0) diff else 0L + + } else { + _lastObservedValue.set(wrappedValueCollector.currentValue) + _readAtLeastOnce = true + 0L + } - Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector) } +} - def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - fromConfig(config, system, Scale.Unit)(currentValueCollector) - } +object DifferentialValueCollector { + def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector = + new DifferentialValueCollector(wrappedValueCollector) - implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { - def currentValue: Long = f.apply() - } + def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector = + new DifferentialValueCollector(new CurrentValueCollector { + def currentValue: Long = wrappedValueCollector + }) } class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { - val refreshValuesSchedule = new AtomicReference[Cancellable]() + private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]() def record(value: Long): Unit = underlyingHistogram.record(value) @@ -73,10 +89,12 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) def cleanup: Unit = { - if (refreshValuesSchedule.get() != null) - refreshValuesSchedule.get().cancel() + if (automaticValueCollectorSchedule.get() != null) + automaticValueCollectorSchedule.get().cancel() } - def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) + def refreshValue(): Unit = + underlyingHistogram.record(currentValueCollector.currentValue) + } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index bed75fc8..5c4c7f71 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -17,12 +17,11 @@ package kamon.metric.instrument import java.nio.LongBuffer -import com.typesafe.config.Config import org.HdrHistogram.AtomicHistogramFieldsAccessor +import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange } import org.HdrHistogram.AtomicHistogram -import kamon.metric._ -trait Histogram extends MetricRecorder { +trait Histogram extends Instrument { type SnapshotType = Histogram.Snapshot def record(value: Long) @@ -31,30 +30,40 @@ trait Histogram extends MetricRecorder { object Histogram { - def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = - new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) - - def fromConfig(config: Config): Histogram = { - fromConfig(config, Scale.Unit) - } - - def fromConfig(config: Config, scale: Scale): Histogram = { - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - - new HdrHistogram(1L, highest, significantDigits, scale) - } - - object HighestTrackableValue { - val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L - } - - case class Precision(significantDigits: Int) - object Precision { - val Low = Precision(1) - val Normal = Precision(2) - val Fine = Precision(3) - } + /** + * Scala API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange) + + /** + * Java API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange) + + /** + * DynamicRange is a configuration object used to supply range and precision configuration to a + * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]] + * for more details on how it works and the effects of these configuration values. + * + * @param lowestDiscernibleValue + * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that + * is >= 1. May be internally rounded down to nearest power of 2. + * + * @param highestTrackableValue + * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue). + * Must not be larger than (Long.MAX_VALUE/2). + * + * @param precision + * The number of significant decimal digits to which the histogram will maintain value resolution and separation. + * Must be a non-negative integer between 1 and 3. + */ + case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int) trait Record { def level: Long @@ -67,29 +76,28 @@ object Histogram { var rawCompactRecord: Long = 0L } - trait Snapshot extends MetricSnapshot { - type SnapshotType = Histogram.Snapshot + trait Snapshot extends InstrumentSnapshot { def isEmpty: Boolean = numberOfMeasurements == 0 - def scale: Scale def numberOfMeasurements: Long def min: Long def max: Long def sum: Long def percentile(percentile: Double): Long def recordsIterator: Iterator[Record] + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot } object Snapshot { - def empty(targetScale: Scale) = new Snapshot { + val empty = new Snapshot { override def min: Long = 0L override def max: Long = 0L override def sum: Long = 0L override def percentile(percentile: Double): Long = 0L override def recordsIterator: Iterator[Record] = Iterator.empty - override def merge(that: Snapshot, context: CollectionContext): Snapshot = that - override def scale: Scale = targetScale + override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that + override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that override def numberOfMeasurements: Long = 0L } } @@ -100,10 +108,8 @@ object Histogram { * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ -class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) - extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) - with Histogram with AtomicHistogramFieldsAccessor { - +class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue, + dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor { import AtomicHistogramFieldsAccessor.totalCountUpdater def record(value: Long): Unit = recordValue(value) @@ -119,7 +125,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign val measurementsArray = Array.ofDim[Long](buffer.limit()) buffer.get(measurementsArray, 0, measurementsArray.length) - new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) } def getCounts = countsArray().length() @@ -160,7 +166,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign } -case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, +case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) @@ -182,53 +188,61 @@ case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, percentileLevel } - def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { - if (that.isEmpty) this else if (this.isEmpty) that else { - import context.buffer - buffer.clear() + def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot = + merge(that.asInstanceOf[InstrumentSnapshot], context) - val selfIterator = recordsIterator - val thatIterator = that.recordsIterator - var thatCurrentRecord: Histogram.Record = null - var mergedNumberOfMeasurements = 0L + def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match { + case thatSnapshot: CompactHdrSnapshot ⇒ + if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else { + import context.buffer + buffer.clear() - def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null - def addToBuffer(compactRecord: Long): Unit = { - mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) - buffer.put(compactRecord) - } + val selfIterator = recordsIterator + val thatIterator = thatSnapshot.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L - while (selfIterator.hasNext) { - val selfCurrentRecord = selfIterator.next() + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } - // Advance that to no further than the level of selfCurrentRecord - thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord - while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { - addToBuffer(thatCurrentRecord.rawCompactRecord) - thatCurrentRecord = nextOrNull(thatIterator) + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } } - // Include the current record of self and optionally merge if has the same level as thatCurrentRecord - if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { - addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) - thatCurrentRecord = nextOrNull(thatIterator) - } else { - addToBuffer(selfCurrentRecord.rawCompactRecord) + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) } - } - // Include everything that might have been left from that - if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) - while (thatIterator.hasNext) { - addToBuffer(thatIterator.next().rawCompactRecord) + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) } - buffer.flip() - val compactRecords = Array.ofDim[Long](buffer.limit()) - buffer.get(compactRecords) + case other ⇒ + sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.") - new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) - } } @inline private def mergeCompactRecords(left: Long, right: Long): Long = { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala new file mode 100644 index 00000000..8cacc767 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala @@ -0,0 +1,56 @@ +package kamon.metric.instrument + +import java.nio.LongBuffer + +import akka.actor.{ Scheduler, Cancellable } +import akka.dispatch.MessageDispatcher +import scala.concurrent.duration.FiniteDuration + +private[kamon] trait Instrument { + type SnapshotType <: InstrumentSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit +} + +trait InstrumentSnapshot { + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot +} + +class InstrumentType private[kamon] (val id: Int) extends AnyVal +object InstrumentTypes { + val Histogram = new InstrumentType(1) + val MinMaxCounter = new InstrumentType(2) + val Gauge = new InstrumentType(3) + val Counter = new InstrumentType(4) +} + +trait CollectionContext { + def buffer: LongBuffer +} + +object CollectionContext { + def apply(longBufferSize: Int): CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(longBufferSize) + } +} + +trait RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable +} + +object RefreshScheduler { + val NoopScheduler = new RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = new Cancellable { + override def isCancelled: Boolean = true + override def cancel(): Boolean = true + } + } + + def apply(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = new RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = + scheduler.schedule(interval, interval)(refresh.apply())(dispatcher) + } + + def create(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = apply(scheduler, dispatcher) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala new file mode 100644 index 00000000..9b0c85cb --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -0,0 +1,35 @@ +package kamon.metric.instrument + +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentFactory(configurations: Map[String, InstrumentCustomSettings], defaults: DefaultInstrumentSettings, scheduler: RefreshScheduler) { + + private def resolveSettings(instrumentName: String, codeSettings: Option[InstrumentSettings], default: InstrumentSettings): InstrumentSettings = { + configurations.get(instrumentName).flatMap { customSettings ⇒ + codeSettings.map(cs ⇒ customSettings.combine(cs)) orElse (Some(customSettings.combine(default))) + + } getOrElse (codeSettings.getOrElse(default)) + } + + def createHistogram(name: String, dynamicRange: Option[DynamicRange] = None): Histogram = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, None)), defaults.histogram) + Histogram(settings.dynamicRange) + } + + def createMinMaxCounter(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.minMaxCounter) + MinMaxCounter(settings.dynamicRange, settings.refreshInterval.get, scheduler) + } + + def createGauge(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None, + valueCollector: CurrentValueCollector): Gauge = { + + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.gauge) + Gauge(settings.dynamicRange, settings.refreshInterval.get, scheduler, valueCollector) + } + + def createCounter(): Counter = Counter() +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala new file mode 100644 index 00000000..1446a25d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala @@ -0,0 +1,67 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config +import kamon.metric.instrument.Histogram.DynamicRange + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentCustomSettings(lowestDiscernibleValue: Option[Long], highestTrackableValue: Option[Long], + precision: Option[Int], refreshInterval: Option[FiniteDuration]) { + + def combine(that: InstrumentSettings): InstrumentSettings = + InstrumentSettings( + DynamicRange( + lowestDiscernibleValue.getOrElse(that.dynamicRange.lowestDiscernibleValue), + highestTrackableValue.getOrElse(that.dynamicRange.highestTrackableValue), + precision.getOrElse(that.dynamicRange.precision)), + refreshInterval.orElse(that.refreshInterval)) +} + +object InstrumentCustomSettings { + import scala.concurrent.duration._ + + def fromConfig(config: Config): InstrumentCustomSettings = + InstrumentCustomSettings( + if (config.hasPath("lowest-discernible-value")) Some(config.getLong("lowest-discernible-value")) else None, + if (config.hasPath("highest-trackable-value")) Some(config.getLong("highest-trackable-value")) else None, + if (config.hasPath("precision")) Some(InstrumentSettings.parsePrecision(config.getString("precision"))) else None, + if (config.hasPath("refresh-interval")) Some(config.getDuration("refresh-interval", TimeUnit.NANOSECONDS).nanos) else None) + +} + +case class InstrumentSettings(dynamicRange: DynamicRange, refreshInterval: Option[FiniteDuration]) + +object InstrumentSettings { + + def readDynamicRange(config: Config): DynamicRange = + DynamicRange( + config.getLong("lowest-discernible-value"), + config.getLong("highest-trackable-value"), + parsePrecision(config.getString("precision"))) + + def parsePrecision(stringValue: String): Int = stringValue match { + case "low" ⇒ 1 + case "normal" ⇒ 2 + case "fine" ⇒ 3 + case other ⇒ sys.error(s"Invalid precision configuration [$other] found, valid options are: [low|normal|fine].") + } +} + +case class DefaultInstrumentSettings(histogram: InstrumentSettings, minMaxCounter: InstrumentSettings, gauge: InstrumentSettings) + +object DefaultInstrumentSettings { + + def fromConfig(config: Config): DefaultInstrumentSettings = { + import scala.concurrent.duration._ + + val histogramSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("histogram")), None) + val minMaxCounterSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("min-max-counter")), + Some(config.getDuration("min-max-counter.refresh-interval", TimeUnit.NANOSECONDS).nanos)) + val gaugeSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("gauge")), + Some(config.getDuration("gauge.refresh-interval", TimeUnit.NANOSECONDS).nanos)) + + DefaultInstrumentSettings(histogramSettings, minMaxCounterSettings, gaugeSettings) + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala index 4882d2aa..0828c8a9 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -17,16 +17,14 @@ package kamon.metric.instrument */ import java.lang.Math.abs -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ ActorSystem, Cancellable } -import com.typesafe.config.Config +import akka.actor.Cancellable import kamon.jsr166.LongMaxUpdater -import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.metric.instrument.Histogram.DynamicRange import kamon.util.PaddedAtomicLong import scala.concurrent.duration.FiniteDuration -trait MinMaxCounter extends MetricRecorder { +trait MinMaxCounter extends Instrument { override type SnapshotType = Histogram.Snapshot def increment(): Unit @@ -38,29 +36,20 @@ trait MinMaxCounter extends MetricRecorder { object MinMaxCounter { - def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem): MinMaxCounter = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = { + val underlyingHistogram = Histogram(dynamicRange) val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { minMaxCounter.refreshValues() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) minMaxCounter } - def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { - import scala.concurrent.duration._ + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = + apply(dynamicRange, refreshInterval, scheduler) - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) - - apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) - } } class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala new file mode 100644 index 00000000..cf6b8b4c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala @@ -0,0 +1,55 @@ +package kamon.metric.instrument + +trait UnitOfMeasurement { + def name: String + def label: String + def factor: Double +} + +object UnitOfMeasurement { + case object Unknown extends UnitOfMeasurement { + val name = "unknown" + val label = "unknown" + val factor = 1D + } + + def isUnknown(uom: UnitOfMeasurement): Boolean = + uom == Unknown + + def isTime(uom: UnitOfMeasurement): Boolean = + uom.isInstanceOf[Time] + +} + +case class Time(factor: Double, label: String) extends UnitOfMeasurement { + val name = "time" + + /** + * Scale a value from this scale factor to a different scale factor. + * + * @param toUnit Time unit of the expected result. + * @param value Value to scale. + * @return Equivalent of value on the target time unit. + */ + def scale(toUnit: Time)(value: Long): Double = + (value * factor) / toUnit.factor +} + +object Time { + val Nanoseconds = Time(1E-9, "n") + val Microseconds = Time(1E-6, "µs") + val Milliseconds = Time(1E-3, "ms") + val Seconds = Time(1, "s") +} + +case class Memory(factor: Double, label: String) extends UnitOfMeasurement { + val name = "bytes" +} + +object Memory { + val Bytes = Memory(1, "b") + val KiloBytes = Memory(1024, "Kb") + val MegaBytes = Memory(1024E2, "Mb") + val GigaBytes = Memory(1024E3, "Gb") +} + |