diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala | 2 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala | 55 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala | 5 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala | 63 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala | 6 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala | 2 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala (renamed from kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala) | 24 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala | 40 |
8 files changed, 148 insertions, 49 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala index e1e89b79..c1392d4d 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -95,7 +95,7 @@ case class GaugeRecorder(key: MetricKey, instrument: Gauge) extends SingleInstru /** * Base class with plenty of utility methods to facilitate the creation of [[EntityRecorder]] implementations. - * It is not required to use this base class for defining custom a custom [[EntityRecorder]], but it is certainly + * It is not required to use this base class for defining a custom [[EntityRecorder]], but it is certainly * the most convenient way to do it and the preferred approach throughout the Kamon codebase. */ abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder { diff --git a/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala b/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala new file mode 100644 index 00000000..e096429d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala @@ -0,0 +1,55 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor.{ Actor, ActorRef, Props } +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.instrument._ + +/** + * Can be used as a decorator to scale TickMetricSnapshot messages to given `timeUnits` and/or `memoryUnits` + * before forwarding to original receiver + * @param timeUnits Optional time units to scale time metrics to + * @param memoryUnits Optional memory units to scale memory metrics to + * @param receiver Receiver of scaled metrics snapshot, usually a backend sender + */ +class MetricScaleDecorator(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef) extends Actor { + require(timeUnits.isDefined || memoryUnits.isDefined, + "Use MetricScaleDecorator only when any of units is defined") + + override def receive: Receive = { + case tick: TickMetricSnapshot ⇒ + val scaled = tick.copy(metrics = tick.metrics.mapValues { entitySnapshot ⇒ + new DefaultEntitySnapshot(entitySnapshot.metrics.map { + case (metricKey, metricSnapshot) ⇒ + val scaledSnapshot = (metricKey.unitOfMeasurement, timeUnits, memoryUnits) match { + case (time: Time, Some(to), _) ⇒ metricSnapshot.scale(time, to) + case (memory: Memory, _, Some(to)) ⇒ metricSnapshot.scale(memory, to) + case _ ⇒ metricSnapshot + } + metricKey -> scaledSnapshot + }) + }) + receiver forward scaled + } +} + +object MetricScaleDecorator { + def props(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef): Props = + Props(new MetricScaleDecorator(timeUnits, memoryUnits, receiver)) +} + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index c1b69cbe..349a12bd 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -33,6 +33,7 @@ object Counter { trait Snapshot extends InstrumentSnapshot { def count: Long def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot + def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot } } @@ -57,4 +58,8 @@ case class CounterSnapshot(count: Long) extends Counter.Snapshot { case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount) case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.") } + + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot = + CounterSnapshot(from.tryScale(to)(count).toLong) + }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index 5c4c7f71..dc9a4bbf 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -17,9 +17,9 @@ package kamon.metric.instrument import java.nio.LongBuffer -import org.HdrHistogram.AtomicHistogramFieldsAccessor -import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange } -import org.HdrHistogram.AtomicHistogram + +import kamon.metric.instrument.Histogram.{ DynamicRange, Snapshot } +import org.HdrHistogram.ModifiedAtomicHistogram trait Histogram extends Instrument { type SnapshotType = Histogram.Snapshot @@ -54,11 +54,9 @@ object Histogram { * @param lowestDiscernibleValue * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that * is >= 1. May be internally rounded down to nearest power of 2. - * * @param highestTrackableValue * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue). * Must not be larger than (Long.MAX_VALUE/2). - * * @param precision * The number of significant decimal digits to which the histogram will maintain value resolution and separation. * Must be a non-negative integer between 1 and 3. @@ -87,6 +85,39 @@ object Histogram { def recordsIterator: Iterator[Record] def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot + + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = + new ScaledSnapshot(from, to, this) + } + + class ScaledSnapshot(from: UnitOfMeasurement, to: UnitOfMeasurement, snapshot: Snapshot) extends Snapshot { + private def doScale(v: Long) = from.tryScale(to)(v).toLong + override def numberOfMeasurements: Long = snapshot.numberOfMeasurements + + override def max: Long = doScale(snapshot.max) + + override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = snapshot.merge(that, context) + + override def merge(that: Snapshot, context: CollectionContext): Snapshot = snapshot.merge(that, context) + + override def percentile(percentile: Double): Long = doScale(snapshot.percentile(percentile)) + + override def min: Long = doScale(snapshot.min) + + override def sum: Long = doScale(snapshot.sum) + + override def recordsIterator: Iterator[Record] = { + snapshot.recordsIterator.map(record ⇒ new Record { + override def count: Long = record.count + + override def level: Long = doScale(record.level) + + override private[kamon] def rawCompactRecord: Long = record.rawCompactRecord + }) + } + + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = + if (this.from == from && this.to == to) this else super.scale(from, to) } object Snapshot { @@ -99,6 +130,7 @@ object Histogram { override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that override def numberOfMeasurements: Long = 0L + override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = this } } } @@ -108,9 +140,8 @@ object Histogram { * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ -class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue, - dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor { - import AtomicHistogramFieldsAccessor.totalCountUpdater +class HdrHistogram(dynamicRange: DynamicRange) extends ModifiedAtomicHistogram(dynamicRange.lowestDiscernibleValue, + dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram { def record(value: Long): Unit = recordValue(value) @@ -125,7 +156,7 @@ class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRa val measurementsArray = Array.ofDim[Long](buffer.limit()) buffer.get(measurementsArray, 0, measurementsArray.length) - new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) } def getCounts = countsArray().length() @@ -148,22 +179,8 @@ class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRa index += 1 } - - reestablishTotalCount(nrOfMeasurements) nrOfMeasurements } - - private def reestablishTotalCount(diff: Long): Unit = { - def tryUpdateTotalCount: Boolean = { - val previousTotalCount = totalCountUpdater.get(this) - val newTotalCount = previousTotalCount - diff - - totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount) - } - - while (!tryUpdateTotalCount) {} - } - } case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala index 089dbeec..2c4b4319 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala @@ -18,10 +18,6 @@ package kamon.metric.instrument import java.nio.LongBuffer -import akka.actor.{ Scheduler, Cancellable } -import akka.dispatch.MessageDispatcher -import scala.concurrent.duration.FiniteDuration - private[kamon] trait Instrument { type SnapshotType <: InstrumentSnapshot @@ -31,6 +27,8 @@ private[kamon] trait Instrument { trait InstrumentSnapshot { def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot + + def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): InstrumentSnapshot } trait CollectionContext { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala index 4423964a..f7516262 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala @@ -1,7 +1,5 @@ package kamon.metric.instrument -import java.util.concurrent.TimeUnit - import com.typesafe.config.Config import kamon.metric.instrument.Histogram.DynamicRange diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala index e79090a8..eb01d114 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala @@ -16,20 +16,16 @@ package org.HdrHistogram -import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } +import java.util.concurrent.atomic.AtomicLongArray -trait AtomicHistogramFieldsAccessor { - self: AtomicHistogram ⇒ +abstract class ModifiedAtomicHistogram(low: Long, high: Long, precision: Int) + extends AtomicHistogram(low, high, precision) { self ⇒ - def countsArray(): AtomicLongArray = self.counts + override def incrementTotalCount(): Unit = {} + override def addToTotalCount(value: Long): Unit = {} - def unitMagnitude(): Int = self.unitMagnitude - - def subBucketHalfCount(): Int = self.subBucketHalfCount - - def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude -} - -object AtomicHistogramFieldsAccessor { - def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater -} + def countsArray(): AtomicLongArray = counts + def protectedUnitMagnitude(): Int = unitMagnitude + def protectedSubBucketHalfCount(): Int = subBucketHalfCount + def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala index c5a1b81a..5952b906 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala @@ -22,14 +22,27 @@ package kamon.metric.instrument * recorders and might be used to scale certain kinds of measurements in metric backends. */ trait UnitOfMeasurement { + type U <: UnitOfMeasurement + def name: String def label: String + def scale(toUnit: U)(value: Double): Double = value + + def tryScale(toUnit: UnitOfMeasurement)(value: Double): Double = + if (canScale(toUnit)) scale(toUnit.asInstanceOf[U])(value) + else throw new IllegalArgumentException(s"Can't scale different types of units `$name` and `${toUnit.name}`") + + protected def canScale(toUnit: UnitOfMeasurement): Boolean + } object UnitOfMeasurement { case object Unknown extends UnitOfMeasurement { + override type U = Unknown.type val name = "unknown" val label = "unknown" + + override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isUnknown(toUnit) } def isUnknown(uom: UnitOfMeasurement): Boolean = @@ -47,10 +60,13 @@ object UnitOfMeasurement { * UnitOfMeasurement representing time. */ case class Time(factor: Double, label: String) extends UnitOfMeasurement { + override type U = Time val name = "time" - def scale(toUnit: Time)(value: Double): Double = + override def scale(toUnit: Time)(value: Double): Double = (value * factor) / toUnit.factor + + override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isTime(toUnit) } object Time { @@ -58,22 +74,36 @@ object Time { val Microseconds = Time(1E-6, "µs") val Milliseconds = Time(1E-3, "ms") val Seconds = Time(1, "s") + + val units = List(Nanoseconds, Microseconds, Milliseconds, Seconds) + + def apply(time: String): Time = units.find(_.label.toLowerCase == time.toLowerCase) getOrElse { + throw new IllegalArgumentException(s"Can't recognize time unit '$time'") + } } /** * UnitOfMeasurement representing computer memory space. */ case class Memory(factor: Double, label: String) extends UnitOfMeasurement { + override type U = Memory val name = "bytes" - def scale(toUnit: Memory)(value: Double): Double = + override def scale(toUnit: Memory)(value: Double): Double = (value * factor) / toUnit.factor + + override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isMemory(toUnit) } object Memory { val Bytes = Memory(1, "b") val KiloBytes = Memory(1024, "Kb") - val MegaBytes = Memory(1024E2, "Mb") - val GigaBytes = Memory(1024E3, "Gb") -} + val MegaBytes = Memory(1024 * 1024, "Mb") + val GigaBytes = Memory(1024 * 1024 * 1024, "Gb") + val units = List(Bytes, KiloBytes, MegaBytes, GigaBytes) + + def apply(memory: String): Memory = units.find(_.label.toLowerCase == memory.toLowerCase) getOrElse { + throw new IllegalArgumentException(s"Can't recognize memory unit '$memory'") + } +} |