aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-01-12 01:45:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-01-24 23:19:01 +0100
commit485abe569d23bccf2d263c82b43e59464dc7e834 (patch)
tree34dd5129afe4c4705ce80830caf8d5e48212ce39 /kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
parent61089a75240f5cc21b056087f1d633dd31981c61 (diff)
downloadKamon-485abe569d23bccf2d263c82b43e59464dc7e834.tar.gz
Kamon-485abe569d23bccf2d263c82b43e59464dc7e834.tar.bz2
Kamon-485abe569d23bccf2d263c82b43e59464dc7e834.zip
! all: improve the metric recorders infrastructure
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala108
1 files changed, 66 insertions, 42 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
index efd7d78f..2341504c 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala
@@ -1,70 +1,89 @@
package kamon.metric.instrument
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference }
-import akka.actor.{ Cancellable, ActorSystem }
-import com.typesafe.config.Config
-import kamon.metric.{ CollectionContext, Scale, MetricRecorder }
+import akka.actor.Cancellable
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
import scala.concurrent.duration.FiniteDuration
-trait Gauge extends MetricRecorder {
+trait Gauge extends Instrument {
type SnapshotType = Histogram.Snapshot
- def record(value: Long)
- def record(value: Long, count: Long)
+ def record(value: Long): Unit
+ def record(value: Long, count: Long): Unit
+ def refreshValue(): Unit
}
object Gauge {
- trait CurrentValueCollector {
- def currentValue: Long
- }
-
- def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration,
- system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
-
- val underlyingHistogram = Histogram(highestTrackableValue, precision, scale)
- val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector)
-
- val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) {
+ def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = {
+ val underlyingHistogram = Histogram(dynamicRange)
+ val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector)
+ val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ {
gauge.refreshValue()
- }(system.dispatcher) // TODO: Move this to Kamon dispatchers
+ })
- gauge.refreshValuesSchedule.set(refreshValuesSchedule)
+ gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule)
gauge
}
- def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge =
- fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction))
+ def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge =
+ apply(dynamicRange, refreshInterval, scheduler, valueCollector)
- def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = {
- val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision")
- fromConfig(config, system)(currentValueCollector)
+ trait CurrentValueCollector {
+ def currentValue: Long
}
- def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = {
- import scala.concurrent.duration._
+ implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f.apply()
+ }
+}
- val highest = config.getLong("highest-trackable-value")
- val significantDigits = config.getInt("significant-value-digits")
- val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS)
+/**
+ * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between
+ * the current observed value and the previously observed value. Should only be used if the observed value is always
+ * increasing or staying steady, but is never able to decrease.
+ *
+ * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between
+ * the current value and the last value will be returned.
+ */
+class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector {
+ @volatile private var _readAtLeastOnce = false
+ private val _lastObservedValue = new AtomicLong(0)
+
+ def currentValue: Long = {
+ if (_readAtLeastOnce) {
+ val wrappedCurrent = wrappedValueCollector.currentValue
+ val d = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent)
+
+ if (d < 0)
+ println("HUBO MENOR QUE CERO")
+
+ d
+
+ } else {
+ _lastObservedValue.set(wrappedValueCollector.currentValue)
+ _readAtLeastOnce = true
+ 0
+ }
- Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector)
}
+}
- def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = {
- fromConfig(config, system, Scale.Unit)(currentValueCollector)
- }
+object DifferentialValueCollector {
+ def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector =
+ new DifferentialValueCollector(wrappedValueCollector)
- implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
- def currentValue: Long = f.apply()
- }
+ def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector =
+ new DifferentialValueCollector(new CurrentValueCollector {
+ def currentValue: Long = wrappedValueCollector
+ })
}
class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge {
- val refreshValuesSchedule = new AtomicReference[Cancellable]()
+ private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]()
def record(value: Long): Unit = underlyingHistogram.record(value)
@@ -73,10 +92,15 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector
def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context)
def cleanup: Unit = {
- if (refreshValuesSchedule.get() != null)
- refreshValuesSchedule.get().cancel()
+ if (automaticValueCollectorSchedule.get() != null)
+ automaticValueCollectorSchedule.get().cancel()
}
- def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue)
+ def refreshValue(): Unit = {
+ val a = currentValueCollector.currentValue
+ if (a < 0)
+ println("RECORDING FROM GAUGE => " + a + " - " + currentValueCollector.getClass)
+ underlyingHistogram.record(a)
+ }
}