diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-12 01:45:27 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-01-24 23:19:01 +0100 |
commit | 485abe569d23bccf2d263c82b43e59464dc7e834 (patch) | |
tree | 34dd5129afe4c4705ce80830caf8d5e48212ce39 /kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala | |
parent | 61089a75240f5cc21b056087f1d633dd31981c61 (diff) | |
download | Kamon-485abe569d23bccf2d263c82b43e59464dc7e834.tar.gz Kamon-485abe569d23bccf2d263c82b43e59464dc7e834.tar.bz2 Kamon-485abe569d23bccf2d263c82b43e59464dc7e834.zip |
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala | 108 |
1 files changed, 66 insertions, 42 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index efd7d78f..2341504c 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -1,70 +1,89 @@ package kamon.metric.instrument -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference } -import akka.actor.{ Cancellable, ActorSystem } -import com.typesafe.config.Config -import kamon.metric.{ CollectionContext, Scale, MetricRecorder } +import akka.actor.Cancellable +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange import scala.concurrent.duration.FiniteDuration -trait Gauge extends MetricRecorder { +trait Gauge extends Instrument { type SnapshotType = Histogram.Snapshot - def record(value: Long) - def record(value: Long, count: Long) + def record(value: Long): Unit + def record(value: Long, count: Long): Unit + def refreshValue(): Unit } object Gauge { - trait CurrentValueCollector { - def currentValue: Long - } - - def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) - val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = { + val underlyingHistogram = Histogram(dynamicRange) + val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector) + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { gauge.refreshValue() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) - gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule) gauge } - def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = - fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = + apply(dynamicRange, refreshInterval, scheduler, valueCollector) - def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { - val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") - fromConfig(config, system)(currentValueCollector) + trait CurrentValueCollector { + def currentValue: Long } - def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = { - import scala.concurrent.duration._ + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) +/** + * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between + * the current observed value and the previously observed value. Should only be used if the observed value is always + * increasing or staying steady, but is never able to decrease. + * + * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between + * the current value and the last value will be returned. + */ +class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector { + @volatile private var _readAtLeastOnce = false + private val _lastObservedValue = new AtomicLong(0) + + def currentValue: Long = { + if (_readAtLeastOnce) { + val wrappedCurrent = wrappedValueCollector.currentValue + val d = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent) + + if (d < 0) + println("HUBO MENOR QUE CERO") + + d + + } else { + _lastObservedValue.set(wrappedValueCollector.currentValue) + _readAtLeastOnce = true + 0 + } - Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector) } +} - def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - fromConfig(config, system, Scale.Unit)(currentValueCollector) - } +object DifferentialValueCollector { + def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector = + new DifferentialValueCollector(wrappedValueCollector) - implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { - def currentValue: Long = f.apply() - } + def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector = + new DifferentialValueCollector(new CurrentValueCollector { + def currentValue: Long = wrappedValueCollector + }) } class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { - val refreshValuesSchedule = new AtomicReference[Cancellable]() + private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]() def record(value: Long): Unit = underlyingHistogram.record(value) @@ -73,10 +92,15 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) def cleanup: Unit = { - if (refreshValuesSchedule.get() != null) - refreshValuesSchedule.get().cancel() + if (automaticValueCollectorSchedule.get() != null) + automaticValueCollectorSchedule.get().cancel() } - def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) + def refreshValue(): Unit = { + val a = currentValueCollector.currentValue + if (a < 0) + println("RECORDING FROM GAUGE => " + a + " - " + currentValueCollector.getClass) + underlyingHistogram.record(a) + } } |