aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala55
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala5
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala63
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala (renamed from kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala)24
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala40
8 files changed, 148 insertions, 49 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
index e1e89b79..c1392d4d 100644
--- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
@@ -95,7 +95,7 @@ case class GaugeRecorder(key: MetricKey, instrument: Gauge) extends SingleInstru
/**
* Base class with plenty of utility methods to facilitate the creation of [[EntityRecorder]] implementations.
- * It is not required to use this base class for defining custom a custom [[EntityRecorder]], but it is certainly
+ * It is not required to use this base class for defining a custom [[EntityRecorder]], but it is certainly
* the most convenient way to do it and the preferred approach throughout the Kamon codebase.
*/
abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder {
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala b/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala
new file mode 100644
index 00000000..e096429d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/MetricScaleDecorator.scala
@@ -0,0 +1,55 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.{ Actor, ActorRef, Props }
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.instrument._
+
+/**
+ * Can be used as a decorator to scale TickMetricSnapshot messages to given `timeUnits` and/or `memoryUnits`
+ * before forwarding to original receiver
+ * @param timeUnits Optional time units to scale time metrics to
+ * @param memoryUnits Optional memory units to scale memory metrics to
+ * @param receiver Receiver of scaled metrics snapshot, usually a backend sender
+ */
+class MetricScaleDecorator(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef) extends Actor {
+ require(timeUnits.isDefined || memoryUnits.isDefined,
+ "Use MetricScaleDecorator only when any of units is defined")
+
+ override def receive: Receive = {
+ case tick: TickMetricSnapshot ⇒
+ val scaled = tick.copy(metrics = tick.metrics.mapValues { entitySnapshot ⇒
+ new DefaultEntitySnapshot(entitySnapshot.metrics.map {
+ case (metricKey, metricSnapshot) ⇒
+ val scaledSnapshot = (metricKey.unitOfMeasurement, timeUnits, memoryUnits) match {
+ case (time: Time, Some(to), _) ⇒ metricSnapshot.scale(time, to)
+ case (memory: Memory, _, Some(to)) ⇒ metricSnapshot.scale(memory, to)
+ case _ ⇒ metricSnapshot
+ }
+ metricKey -> scaledSnapshot
+ })
+ })
+ receiver forward scaled
+ }
+}
+
+object MetricScaleDecorator {
+ def props(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef): Props =
+ Props(new MetricScaleDecorator(timeUnits, memoryUnits, receiver))
+}
+
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
index c1b69cbe..349a12bd 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala
@@ -33,6 +33,7 @@ object Counter {
trait Snapshot extends InstrumentSnapshot {
def count: Long
def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot
+ def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot
}
}
@@ -57,4 +58,8 @@ case class CounterSnapshot(count: Long) extends Counter.Snapshot {
case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount)
case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.")
}
+
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot =
+ CounterSnapshot(from.tryScale(to)(count).toLong)
+
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
index 5c4c7f71..dc9a4bbf 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala
@@ -17,9 +17,9 @@
package kamon.metric.instrument
import java.nio.LongBuffer
-import org.HdrHistogram.AtomicHistogramFieldsAccessor
-import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange }
-import org.HdrHistogram.AtomicHistogram
+
+import kamon.metric.instrument.Histogram.{ DynamicRange, Snapshot }
+import org.HdrHistogram.ModifiedAtomicHistogram
trait Histogram extends Instrument {
type SnapshotType = Histogram.Snapshot
@@ -54,11 +54,9 @@ object Histogram {
* @param lowestDiscernibleValue
* The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that
* is >= 1. May be internally rounded down to nearest power of 2.
- *
* @param highestTrackableValue
* The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue).
* Must not be larger than (Long.MAX_VALUE/2).
- *
* @param precision
* The number of significant decimal digits to which the histogram will maintain value resolution and separation.
* Must be a non-negative integer between 1 and 3.
@@ -87,6 +85,39 @@ object Histogram {
def recordsIterator: Iterator[Record]
def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot
+
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot =
+ new ScaledSnapshot(from, to, this)
+ }
+
+ class ScaledSnapshot(from: UnitOfMeasurement, to: UnitOfMeasurement, snapshot: Snapshot) extends Snapshot {
+ private def doScale(v: Long) = from.tryScale(to)(v).toLong
+ override def numberOfMeasurements: Long = snapshot.numberOfMeasurements
+
+ override def max: Long = doScale(snapshot.max)
+
+ override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = snapshot.merge(that, context)
+
+ override def merge(that: Snapshot, context: CollectionContext): Snapshot = snapshot.merge(that, context)
+
+ override def percentile(percentile: Double): Long = doScale(snapshot.percentile(percentile))
+
+ override def min: Long = doScale(snapshot.min)
+
+ override def sum: Long = doScale(snapshot.sum)
+
+ override def recordsIterator: Iterator[Record] = {
+ snapshot.recordsIterator.map(record ⇒ new Record {
+ override def count: Long = record.count
+
+ override def level: Long = doScale(record.level)
+
+ override private[kamon] def rawCompactRecord: Long = record.rawCompactRecord
+ })
+ }
+
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot =
+ if (this.from == from && this.to == to) this else super.scale(from, to)
}
object Snapshot {
@@ -99,6 +130,7 @@ object Histogram {
override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that
override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that
override def numberOfMeasurements: Long = 0L
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = this
}
}
}
@@ -108,9 +140,8 @@ object Histogram {
* The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still
* leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
*/
-class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue,
- dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor {
- import AtomicHistogramFieldsAccessor.totalCountUpdater
+class HdrHistogram(dynamicRange: DynamicRange) extends ModifiedAtomicHistogram(dynamicRange.lowestDiscernibleValue,
+ dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram {
def record(value: Long): Unit = recordValue(value)
@@ -125,7 +156,7 @@ class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRa
val measurementsArray = Array.ofDim[Long](buffer.limit())
buffer.get(measurementsArray, 0, measurementsArray.length)
- new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude())
+ new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude())
}
def getCounts = countsArray().length()
@@ -148,22 +179,8 @@ class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRa
index += 1
}
-
- reestablishTotalCount(nrOfMeasurements)
nrOfMeasurements
}
-
- private def reestablishTotalCount(diff: Long): Unit = {
- def tryUpdateTotalCount: Boolean = {
- val previousTotalCount = totalCountUpdater.get(this)
- val newTotalCount = previousTotalCount - diff
-
- totalCountUpdater.compareAndSet(this, previousTotalCount, newTotalCount)
- }
-
- while (!tryUpdateTotalCount) {}
- }
-
}
case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
index 089dbeec..2c4b4319 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
@@ -18,10 +18,6 @@ package kamon.metric.instrument
import java.nio.LongBuffer
-import akka.actor.{ Scheduler, Cancellable }
-import akka.dispatch.MessageDispatcher
-import scala.concurrent.duration.FiniteDuration
-
private[kamon] trait Instrument {
type SnapshotType <: InstrumentSnapshot
@@ -31,6 +27,8 @@ private[kamon] trait Instrument {
trait InstrumentSnapshot {
def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
+
+ def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): InstrumentSnapshot
}
trait CollectionContext {
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala
index 4423964a..f7516262 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala
@@ -1,7 +1,5 @@
package kamon.metric.instrument
-import java.util.concurrent.TimeUnit
-
import com.typesafe.config.Config
import kamon.metric.instrument.Histogram.DynamicRange
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala b/kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala
index e79090a8..eb01d114 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala
@@ -16,20 +16,16 @@
package org.HdrHistogram
-import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater }
+import java.util.concurrent.atomic.AtomicLongArray
-trait AtomicHistogramFieldsAccessor {
- self: AtomicHistogram ⇒
+abstract class ModifiedAtomicHistogram(low: Long, high: Long, precision: Int)
+ extends AtomicHistogram(low, high, precision) { self ⇒
- def countsArray(): AtomicLongArray = self.counts
+ override def incrementTotalCount(): Unit = {}
+ override def addToTotalCount(value: Long): Unit = {}
- def unitMagnitude(): Int = self.unitMagnitude
-
- def subBucketHalfCount(): Int = self.subBucketHalfCount
-
- def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude
-}
-
-object AtomicHistogramFieldsAccessor {
- def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater
-}
+ def countsArray(): AtomicLongArray = counts
+ def protectedUnitMagnitude(): Int = unitMagnitude
+ def protectedSubBucketHalfCount(): Int = subBucketHalfCount
+ def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
index c5a1b81a..5952b906 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
@@ -22,14 +22,27 @@ package kamon.metric.instrument
* recorders and might be used to scale certain kinds of measurements in metric backends.
*/
trait UnitOfMeasurement {
+ type U <: UnitOfMeasurement
+
def name: String
def label: String
+ def scale(toUnit: U)(value: Double): Double = value
+
+ def tryScale(toUnit: UnitOfMeasurement)(value: Double): Double =
+ if (canScale(toUnit)) scale(toUnit.asInstanceOf[U])(value)
+ else throw new IllegalArgumentException(s"Can't scale different types of units `$name` and `${toUnit.name}`")
+
+ protected def canScale(toUnit: UnitOfMeasurement): Boolean
+
}
object UnitOfMeasurement {
case object Unknown extends UnitOfMeasurement {
+ override type U = Unknown.type
val name = "unknown"
val label = "unknown"
+
+ override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isUnknown(toUnit)
}
def isUnknown(uom: UnitOfMeasurement): Boolean =
@@ -47,10 +60,13 @@ object UnitOfMeasurement {
* UnitOfMeasurement representing time.
*/
case class Time(factor: Double, label: String) extends UnitOfMeasurement {
+ override type U = Time
val name = "time"
- def scale(toUnit: Time)(value: Double): Double =
+ override def scale(toUnit: Time)(value: Double): Double =
(value * factor) / toUnit.factor
+
+ override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isTime(toUnit)
}
object Time {
@@ -58,22 +74,36 @@ object Time {
val Microseconds = Time(1E-6, "µs")
val Milliseconds = Time(1E-3, "ms")
val Seconds = Time(1, "s")
+
+ val units = List(Nanoseconds, Microseconds, Milliseconds, Seconds)
+
+ def apply(time: String): Time = units.find(_.label.toLowerCase == time.toLowerCase) getOrElse {
+ throw new IllegalArgumentException(s"Can't recognize time unit '$time'")
+ }
}
/**
* UnitOfMeasurement representing computer memory space.
*/
case class Memory(factor: Double, label: String) extends UnitOfMeasurement {
+ override type U = Memory
val name = "bytes"
- def scale(toUnit: Memory)(value: Double): Double =
+ override def scale(toUnit: Memory)(value: Double): Double =
(value * factor) / toUnit.factor
+
+ override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isMemory(toUnit)
}
object Memory {
val Bytes = Memory(1, "b")
val KiloBytes = Memory(1024, "Kb")
- val MegaBytes = Memory(1024E2, "Mb")
- val GigaBytes = Memory(1024E3, "Gb")
-}
+ val MegaBytes = Memory(1024 * 1024, "Mb")
+ val GigaBytes = Memory(1024 * 1024 * 1024, "Gb")
+ val units = List(Bytes, KiloBytes, MegaBytes, GigaBytes)
+
+ def apply(memory: String): Memory = units.find(_.label.toLowerCase == memory.toLowerCase) getOrElse {
+ throw new IllegalArgumentException(s"Can't recognize memory unit '$memory'")
+ }
+}