aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/legacy-main/scala/kamon/metric
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-04-24 13:54:40 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-04-24 13:54:40 +0200
commit4d828e1a3195e55365c865aa3a78af9668742643 (patch)
tree07fff2683933c96297a8ba577bbdc89888da16e1 /kamon-core/src/legacy-main/scala/kamon/metric
parent469c11dc1ddb140f407a33f48033e533bf60611c (diff)
downloadKamon-4d828e1a3195e55365c865aa3a78af9668742643.tar.gz
Kamon-4d828e1a3195e55365c865aa3a78af9668742643.tar.bz2
Kamon-4d828e1a3195e55365c865aa3a78af9668742643.zip
Prepare for the major cleanup
Moved all the original files from src/main to src/legacy-main, same with test files. Also removed the autoweave module, examples and bench as I'm planning to have them in separate repositories.
Diffstat (limited to 'kamon-core/src/legacy-main/scala/kamon/metric')
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/Entity.scala37
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/EntityRecorder.scala235
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/EntitySnapshot.scala63
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/MetricKey.scala47
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/MetricScaleDecorator.scala57
-rwxr-xr-xkamon-core/src/legacy-main/scala/kamon/metric/MetricsModule.scala394
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/MetricsSettings.scala123
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala116
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/TickMetricSnapshotBuffer.scala65
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/TraceMetrics.scala53
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/Counter.scala65
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/Gauge.scala120
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/Histogram.scala331
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/Instrument.scala51
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentFactory.scala51
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentSettings.scala73
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/MinMaxCounter.scala105
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala31
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/RefreshScheduler.scala115
-rw-r--r--kamon-core/src/legacy-main/scala/kamon/metric/instrument/UnitOfMeasurement.scala109
20 files changed, 2241 insertions, 0 deletions
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/Entity.scala b/kamon-core/src/legacy-main/scala/kamon/metric/Entity.scala
new file mode 100644
index 00000000..91249af0
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/Entity.scala
@@ -0,0 +1,37 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+/**
+ * Identify a `thing` that is being monitored by Kamon. A [[kamon.metric.Entity]] is used to identify tracked `things`
+ * in both the metrics recording and reporting sides. Only the name and category fields are used with determining
+ * equality between two entities.
+ *
+ * // TODO: Find a better word for `thing`.
+ */
+case class Entity(name: String, category: String, tags: Map[String, String])
+
+object Entity {
+ def apply(name: String, category: String): Entity =
+ apply(name, category, Map.empty)
+
+ def create(name: String, category: String): Entity =
+ apply(name, category, Map.empty)
+
+ def create(name: String, category: String, tags: Map[String, String]): Entity =
+ new Entity(name, category, tags)
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/legacy-main/scala/kamon/metric/EntityRecorder.scala
new file mode 100644
index 00000000..e3b136dd
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/EntityRecorder.scala
@@ -0,0 +1,235 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.metric.instrument._
+import kamon.util.Function
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+
+trait EntityRecorder {
+ def collect(collectionContext: CollectionContext): EntitySnapshot
+ def cleanup: Unit
+}
+
+trait EntityRecorderFactory[T <: EntityRecorder] {
+ def category: String
+ def createRecorder(instrumentFactory: InstrumentFactory): T
+}
+
+abstract class EntityRecorderFactoryCompanion[T <: EntityRecorder](val category: String, builder: (InstrumentFactory) ⇒ T)
+ extends EntityRecorderFactory[T] {
+
+ def createRecorder(instrumentFactory: InstrumentFactory): T = builder(instrumentFactory)
+}
+
+object EntityRecorderFactory {
+ def apply[T <: EntityRecorder](entityCategory: String, factory: InstrumentFactory ⇒ T): EntityRecorderFactory[T] =
+ new EntityRecorderFactory[T] {
+ def category: String = entityCategory
+ def createRecorder(instrumentFactory: InstrumentFactory): T = factory(instrumentFactory)
+ }
+
+ def create[T <: EntityRecorder](entityCategory: String, factory: Function[InstrumentFactory, T]): EntityRecorderFactory[T] =
+ new EntityRecorderFactory[T] {
+ def category: String = entityCategory
+ def createRecorder(instrumentFactory: InstrumentFactory): T = factory(instrumentFactory)
+ }
+}
+
+private[kamon] sealed trait SingleInstrumentEntityRecorder extends EntityRecorder {
+ def key: MetricKey
+ def instrument: Instrument
+
+ def collect(collectionContext: CollectionContext): EntitySnapshot =
+ new DefaultEntitySnapshot(Map(key → instrument.collect(collectionContext)))
+
+ def cleanup: Unit = instrument.cleanup
+}
+
+object SingleInstrumentEntityRecorder {
+ val Histogram = "histogram"
+ val MinMaxCounter = "min-max-counter"
+ val Gauge = "gauge"
+ val Counter = "counter"
+
+ val AllCategories = List("histogram", "gauge", "counter", "min-max-counter")
+}
+
+/**
+ * Entity recorder for a single Counter instrument.
+ */
+case class CounterRecorder(key: MetricKey, instrument: Counter) extends SingleInstrumentEntityRecorder
+
+/**
+ * Entity recorder for a single Histogram instrument.
+ */
+case class HistogramRecorder(key: MetricKey, instrument: Histogram) extends SingleInstrumentEntityRecorder
+
+/**
+ * Entity recorder for a single MinMaxCounter instrument.
+ */
+case class MinMaxCounterRecorder(key: MetricKey, instrument: MinMaxCounter) extends SingleInstrumentEntityRecorder
+
+/**
+ * Entity recorder for a single Gauge instrument.
+ */
+case class GaugeRecorder(key: MetricKey, instrument: Gauge) extends SingleInstrumentEntityRecorder
+
+/**
+ * Base class with plenty of utility methods to facilitate the creation of [[EntityRecorder]] implementations.
+ * It is not required to use this base class for defining a custom [[EntityRecorder]], but it is certainly
+ * the most convenient way to do it and the preferred approach throughout the Kamon codebase.
+ */
+abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder {
+ import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax
+
+ private val _instruments = TrieMap.empty[MetricKey, Instrument]
+ private def register[T <: Instrument](key: MetricKey, instrument: ⇒ T): T =
+ _instruments.atomicGetOrElseUpdate(key, instrument, _.cleanup).asInstanceOf[T]
+
+ protected def histogram(name: String): Histogram =
+ register(HistogramKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createHistogram(name))
+
+ protected def histogram(name: String, dynamicRange: DynamicRange): Histogram =
+ register(HistogramKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createHistogram(name, Some(dynamicRange)))
+
+ protected def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name))
+
+ protected def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name, Some(dynamicRange)))
+
+ protected def removeHistogram(name: String): Unit =
+ _instruments.remove(HistogramKey(name, UnitOfMeasurement.Unknown))
+
+ protected def removeHistogram(name: String, unitOfMeasurement: UnitOfMeasurement): Unit =
+ _instruments.remove(HistogramKey(name, unitOfMeasurement))
+
+ protected def minMaxCounter(name: String): MinMaxCounter =
+ register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter =
+ register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange)))
+
+ protected def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange)))
+
+ protected def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ register(MinMaxCounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, refreshInterval = Some(refreshInterval)))
+
+ protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange), Some(refreshInterval)))
+
+ protected def removeMinMaxCounter(name: String): Unit =
+ _instruments.remove(MinMaxCounterKey(name, UnitOfMeasurement.Unknown))
+
+ protected def removeMinMaxCounter(key: MinMaxCounterKey): Unit =
+ _instruments.remove(key)
+
+ protected def gauge(name: String, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge =
+ register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector))
+
+ protected def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge =
+ register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector))
+
+ protected def removeGauge(name: String): Unit =
+ _instruments.remove(GaugeKey(name, UnitOfMeasurement.Unknown))
+
+ protected def removeGauge(key: GaugeKey): Unit =
+ _instruments.remove(key)
+
+ protected def counter(name: String): Counter =
+ register(CounterKey(name, UnitOfMeasurement.Unknown), instrumentFactory.createCounter())
+
+ protected def counter(name: String, unitOfMeasurement: UnitOfMeasurement): Counter =
+ register(CounterKey(name, unitOfMeasurement), instrumentFactory.createCounter())
+
+ protected def counter(key: CounterKey): Counter =
+ register(key, instrumentFactory.createCounter())
+
+ protected def removeCounter(name: String): Unit =
+ _instruments.remove(CounterKey(name, UnitOfMeasurement.Unknown))
+
+ protected def removeCounter(key: CounterKey): Unit =
+ _instruments.remove(key)
+
+ def collect(collectionContext: CollectionContext): EntitySnapshot = {
+ val snapshots = Map.newBuilder[MetricKey, InstrumentSnapshot]
+ _instruments.foreach {
+ case (key, instrument) ⇒ snapshots += key → instrument.collect(collectionContext)
+ }
+
+ new DefaultEntitySnapshot(snapshots.result())
+ }
+
+ def cleanup: Unit = _instruments.values.foreach(_.cleanup)
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/legacy-main/scala/kamon/metric/EntitySnapshot.scala
new file mode 100644
index 00000000..16edecd8
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/EntitySnapshot.scala
@@ -0,0 +1,63 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import kamon.metric.instrument.{Counter, Histogram, CollectionContext, InstrumentSnapshot}
+import kamon.util.MapMerge
+import scala.reflect.ClassTag
+
+trait EntitySnapshot {
+ def metrics: Map[MetricKey, InstrumentSnapshot]
+ def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot
+
+ def histogram(name: String): Option[Histogram.Snapshot] =
+ find[HistogramKey, Histogram.Snapshot](name)
+
+ def minMaxCounter(name: String): Option[Histogram.Snapshot] =
+ find[MinMaxCounterKey, Histogram.Snapshot](name)
+
+ def gauge(name: String): Option[Histogram.Snapshot] =
+ find[GaugeKey, Histogram.Snapshot](name)
+
+ def counter(name: String): Option[Counter.Snapshot] =
+ find[CounterKey, Counter.Snapshot](name)
+
+ def histograms: Map[HistogramKey, Histogram.Snapshot] =
+ filterByType[HistogramKey, Histogram.Snapshot]
+
+ def minMaxCounters: Map[MinMaxCounterKey, Histogram.Snapshot] =
+ filterByType[MinMaxCounterKey, Histogram.Snapshot]
+
+ def gauges: Map[GaugeKey, Histogram.Snapshot] =
+ filterByType[GaugeKey, Histogram.Snapshot]
+
+ def counters: Map[CounterKey, Counter.Snapshot] =
+ filterByType[CounterKey, Counter.Snapshot]
+
+ private def filterByType[K <: MetricKey, V <: InstrumentSnapshot](implicit keyCT: ClassTag[K]): Map[K, V] =
+ metrics.collect { case (k, v) if keyCT.runtimeClass.isInstance(k) ⇒ (k.asInstanceOf[K], v.asInstanceOf[V]) }
+
+ private def find[K <: MetricKey, V <: InstrumentSnapshot](name: String)(implicit keyCT: ClassTag[K]) =
+ metrics.find { case (k, v) ⇒ keyCT.runtimeClass.isInstance(k) && k.name == name } map (_._2.asInstanceOf[V])
+}
+
+class DefaultEntitySnapshot(val metrics: Map[MetricKey, InstrumentSnapshot]) extends EntitySnapshot {
+ import MapMerge.Syntax
+
+ override def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot =
+ new DefaultEntitySnapshot(metrics.merge(that.metrics, (l, r) ⇒ l.merge(r, collectionContext)))
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/MetricKey.scala b/kamon-core/src/legacy-main/scala/kamon/metric/MetricKey.scala
new file mode 100644
index 00000000..0d4e0163
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/MetricKey.scala
@@ -0,0 +1,47 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import kamon.metric.instrument.UnitOfMeasurement
+
+/**
+ * MetricKeys are used to identify a given metric in entity recorders and snapshots.
+ */
+sealed trait MetricKey {
+ def name: String
+ def unitOfMeasurement: UnitOfMeasurement
+}
+
+/**
+ * MetricKey for all Histogram-based metrics.
+ */
+private[kamon] case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey
+
+/**
+ * MetricKey for all MinMaxCounter-based metrics.
+ */
+private[kamon] case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey
+
+/**
+ * MetricKey for all Gauge-based metrics.
+ */
+private[kamon] case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey
+
+/**
+ * MetricKey for all Counter-based metrics.
+ */
+private[kamon] case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement) extends MetricKey
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/MetricScaleDecorator.scala b/kamon-core/src/legacy-main/scala/kamon/metric/MetricScaleDecorator.scala
new file mode 100644
index 00000000..06de65ef
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/MetricScaleDecorator.scala
@@ -0,0 +1,57 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.{Actor, ActorRef, Props}
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.instrument._
+
+/**
+ * Can be used as a decorator to scale TickMetricSnapshot messages to given `timeUnits` and/or `memoryUnits`
+ * before forwarding to original receiver
+ * @param timeUnits Optional time units to scale time metrics to
+ * @param memoryUnits Optional memory units to scale memory metrics to
+ * @param receiver Receiver of scaled metrics snapshot, usually a backend sender
+ */
+class MetricScaleDecorator(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef) extends Actor {
+ require(
+ timeUnits.isDefined || memoryUnits.isDefined,
+ "Use MetricScaleDecorator only when any of units is defined"
+ )
+
+ override def receive: Receive = {
+ case tick: TickMetricSnapshot ⇒
+ val scaled = tick.copy(metrics = tick.metrics.mapValues { entitySnapshot ⇒
+ new DefaultEntitySnapshot(entitySnapshot.metrics.map {
+ case (metricKey, metricSnapshot) ⇒
+ val scaledSnapshot = (metricKey.unitOfMeasurement, timeUnits, memoryUnits) match {
+ case (time: Time, Some(to), _) ⇒ metricSnapshot.scale(time, to)
+ case (memory: Memory, _, Some(to)) ⇒ metricSnapshot.scale(memory, to)
+ case _ ⇒ metricSnapshot
+ }
+ metricKey → scaledSnapshot
+ })
+ })
+ receiver forward scaled
+ }
+}
+
+object MetricScaleDecorator {
+ def props(timeUnits: Option[Time], memoryUnits: Option[Memory], receiver: ActorRef): Props =
+ Props(new MetricScaleDecorator(timeUnits, memoryUnits, receiver))
+}
+
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/MetricsModule.scala b/kamon-core/src/legacy-main/scala/kamon/metric/MetricsModule.scala
new file mode 100755
index 00000000..7c85bb02
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/MetricsModule.scala
@@ -0,0 +1,394 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import java.util.Map.Entry
+
+import akka.actor._
+import com.typesafe.config.{Config, ConfigValue, ConfigValueType}
+import kamon.metric.SubscriptionsDispatcher.{Subscribe, Unsubscribe}
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.metric.instrument._
+import kamon.util.LazyActorRef
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration.FiniteDuration
+
+case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T)
+
+trait MetricsModule {
+ def settings: MetricsSettings
+
+ def shouldTrack(entity: Entity): Boolean
+
+ def shouldTrack(entityName: String, category: String): Boolean =
+ shouldTrack(Entity(entityName, category))
+
+ //
+ // Histograms registration and removal
+ //
+
+ def histogram(name: String): Histogram =
+ registerHistogram(name)
+
+ def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram =
+ registerHistogram(name, unitOfMeasurement = Some(unitOfMeasurement))
+
+ def histogram(name: String, dynamicRange: DynamicRange): Histogram =
+ registerHistogram(name, dynamicRange = Some(dynamicRange))
+
+ def histogram(name: String, unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): Histogram =
+ registerHistogram(name, unitOfMeasurement = Some(unitOfMeasurement), dynamicRange = Some(dynamicRange))
+
+ def histogram(name: String, tags: Map[String, String]): Histogram =
+ registerHistogram(name, tags)
+
+ def histogram(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement): Histogram =
+ registerHistogram(name, tags, Some(unitOfMeasurement))
+
+ def histogram(name: String, tags: Map[String, String], dynamicRange: DynamicRange): Histogram =
+ registerHistogram(name, tags, dynamicRange = Some(dynamicRange))
+
+ def histogram(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): Histogram =
+ registerHistogram(name, tags, Some(unitOfMeasurement), Some(dynamicRange))
+
+ def removeHistogram(name: String): Boolean =
+ removeHistogram(name, Map.empty)
+
+ def registerHistogram(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None,
+ dynamicRange: Option[DynamicRange] = None): Histogram
+
+ def removeHistogram(name: String, tags: Map[String, String]): Boolean
+
+ //
+ // MinMaxCounter registration and removal
+ //
+
+ def minMaxCounter(name: String): MinMaxCounter =
+ registerMinMaxCounter(name)
+
+ def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ registerMinMaxCounter(name, unitOfMeasurement = Some(unitOfMeasurement))
+
+ def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter =
+ registerMinMaxCounter(name, dynamicRange = Some(dynamicRange))
+
+ def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter =
+ registerMinMaxCounter(name, refreshInterval = Some(refreshInterval))
+
+ def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter =
+ registerMinMaxCounter(name, dynamicRange = Some(dynamicRange), refreshInterval = Some(refreshInterval))
+
+ def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): MinMaxCounter =
+ registerMinMaxCounter(name, unitOfMeasurement = Some(unitOfMeasurement), dynamicRange = Some(dynamicRange))
+
+ def minMaxCounter(name: String, tags: Map[String, String]): MinMaxCounter =
+ registerMinMaxCounter(name, tags)
+
+ def minMaxCounter(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement): MinMaxCounter =
+ registerMinMaxCounter(name, tags, Some(unitOfMeasurement))
+
+ def minMaxCounter(name: String, tags: Map[String, String], dynamicRange: DynamicRange): MinMaxCounter =
+ registerMinMaxCounter(name, tags, dynamicRange = Some(dynamicRange))
+
+ def minMaxCounter(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange): MinMaxCounter =
+ registerMinMaxCounter(name, tags, Some(unitOfMeasurement), Some(dynamicRange))
+
+ def removeMinMaxCounter(name: String): Boolean =
+ removeMinMaxCounter(name, Map.empty)
+
+ def removeMinMaxCounter(name: String, tags: Map[String, String]): Boolean
+
+ def registerMinMaxCounter(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None,
+ dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter
+
+ //
+ // Gauge registration and removal
+ //
+
+ def gauge(name: String)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector)
+
+ def gauge(name: String, unitOfMeasurement: UnitOfMeasurement)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, unitOfMeasurement = Some(unitOfMeasurement))
+
+ def gauge(name: String, dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, dynamicRange = Some(dynamicRange))
+
+ def gauge(name: String, refreshInterval: FiniteDuration)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, refreshInterval = Some(refreshInterval))
+
+ def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, dynamicRange = Some(dynamicRange), refreshInterval = Some(refreshInterval))
+
+ def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, unitOfMeasurement = Some(unitOfMeasurement), dynamicRange = Some(dynamicRange))
+
+ def gauge(name: String, tags: Map[String, String])(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, tags)
+
+ def gauge(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, tags, Some(unitOfMeasurement))
+
+ def gauge(name: String, tags: Map[String, String], dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, tags, dynamicRange = Some(dynamicRange))
+
+ def gauge(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement, dynamicRange: DynamicRange)(valueCollector: CurrentValueCollector): Gauge =
+ registerGauge(name, valueCollector, tags, Some(unitOfMeasurement), Some(dynamicRange))
+
+ def removeGauge(name: String): Boolean =
+ removeGauge(name, Map.empty)
+
+ def removeGauge(name: String, tags: Map[String, String]): Boolean
+
+ def registerGauge(name: String, valueCollector: CurrentValueCollector, tags: Map[String, String] = Map.empty,
+ unitOfMeasurement: Option[UnitOfMeasurement] = None, dynamicRange: Option[DynamicRange] = None,
+ refreshInterval: Option[FiniteDuration] = None): Gauge
+
+ //
+ // Counters registration and removal
+ //
+
+ def counter(name: String): Counter =
+ registerCounter(name)
+
+ def counter(name: String, unitOfMeasurement: UnitOfMeasurement): Counter =
+ registerCounter(name, unitOfMeasurement = Some(unitOfMeasurement))
+
+ def counter(name: String, tags: Map[String, String]): Counter =
+ registerCounter(name, tags)
+
+ def counter(name: String, tags: Map[String, String], unitOfMeasurement: UnitOfMeasurement): Counter =
+ registerCounter(name, tags, Some(unitOfMeasurement))
+
+ def removeCounter(name: String): Boolean =
+ removeCounter(name, Map.empty)
+
+ def removeCounter(name: String, tags: Map[String, String]): Boolean
+
+ def registerCounter(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None,
+ dynamicRange: Option[DynamicRange] = None): Counter
+
+ //
+ // Entities registration and removal
+ //
+
+ def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], name: String): T =
+ entity(recorderFactory, Entity(name, recorderFactory.category))
+
+ def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], name: String, tags: Map[String, String]): T =
+ entity(recorderFactory, Entity(name, recorderFactory.category, tags))
+
+ def removeEntity(name: String, category: String): Boolean =
+ removeEntity(Entity(name, category, Map.empty))
+
+ def removeEntity(name: String, category: String, tags: Map[String, String]): Boolean =
+ removeEntity(Entity(name, category, tags))
+
+ def removeEntity(entity: Entity): Boolean
+
+ def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entity: Entity): T
+
+ def find(name: String, category: String): Option[EntityRecorder] =
+ find(Entity(name, category))
+
+ def find(name: String, category: String, tags: Map[String, String]): Option[EntityRecorder] =
+ find(Entity(name, category, tags))
+
+ def find(entity: Entity): Option[EntityRecorder]
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit =
+ subscribe(filter, subscriber, permanently = true)
+
+ def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit =
+ subscribe(SubscriptionFilter(category, selection), subscriber, permanently)
+
+ def subscribe(category: String, selection: String, subscriber: ActorRef): Unit =
+ subscribe(SubscriptionFilter(category, selection), subscriber, permanently = true)
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit
+
+ def unsubscribe(subscriber: ActorRef): Unit
+
+ def buildDefaultCollectionContext: CollectionContext
+
+ def instrumentFactory(category: String): InstrumentFactory
+}
+
+private[kamon] class MetricsModuleImpl(config: Config) extends MetricsModule {
+ import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax
+
+ private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder]
+ private val _subscriptions = new LazyActorRef
+
+ @volatile var settings = MetricsSettings(config)
+
+ val defaultTags: Map[String, String] = if (config.hasPath("kamon.default-tags")) {
+ config.getConfig("kamon.default-tags").resolve().entrySet().asScala
+ .collect {
+ case e: Entry[String, ConfigValue] if e.getValue.valueType() == ConfigValueType.STRING =>
+ (e.getKey, e.getValue.unwrapped().asInstanceOf[String])
+ case e: Entry[String, ConfigValue] if e.getValue.valueType() == ConfigValueType.NUMBER =>
+ (e.getKey, e.getValue.unwrapped().asInstanceOf[Int].toString)
+ case e: Entry[String, ConfigValue] if e.getValue.valueType() == ConfigValueType.BOOLEAN =>
+ (e.getKey, e.getValue.unwrapped().asInstanceOf[Boolean].toString)
+ }.toMap
+ }
+ else {
+ Map.empty
+ }
+
+ def shouldTrack(entity: Entity): Boolean =
+ settings.entityFilters.get(entity.category).map {
+ filter ⇒ filter.accept(entity.name)
+
+ } getOrElse (settings.trackUnmatchedEntities)
+
+ def registerHistogram(name: String, tags: Map[String, String], unitOfMeasurement: Option[UnitOfMeasurement],
+ dynamicRange: Option[DynamicRange]): Histogram = {
+
+ val histogramEntity = Entity(name, SingleInstrumentEntityRecorder.Histogram, tags ++ defaultTags)
+ val recorder = _trackedEntities.atomicGetOrElseUpdate(histogramEntity, {
+ val factory = instrumentFactory(histogramEntity.category)
+ HistogramRecorder(
+ HistogramKey(histogramEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)),
+ factory.createHistogram(name, dynamicRange)
+ )
+ }, _.cleanup)
+
+ recorder.asInstanceOf[HistogramRecorder].instrument
+ }
+
+ def removeHistogram(name: String, tags: Map[String, String]): Boolean =
+ _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.Histogram, tags ++ defaultTags)).isDefined
+
+ def registerMinMaxCounter(name: String, tags: Map[String, String], unitOfMeasurement: Option[UnitOfMeasurement], dynamicRange: Option[DynamicRange],
+ refreshInterval: Option[FiniteDuration]): MinMaxCounter = {
+
+ val minMaxCounterEntity = Entity(name, SingleInstrumentEntityRecorder.MinMaxCounter, tags ++ defaultTags)
+ val recorder = _trackedEntities.atomicGetOrElseUpdate(minMaxCounterEntity, {
+ val factory = instrumentFactory(minMaxCounterEntity.category)
+ MinMaxCounterRecorder(
+ MinMaxCounterKey(minMaxCounterEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)),
+ factory.createMinMaxCounter(name, dynamicRange, refreshInterval)
+ )
+ }, _.cleanup)
+
+ recorder.asInstanceOf[MinMaxCounterRecorder].instrument
+ }
+
+ def removeMinMaxCounter(name: String, tags: Map[String, String]): Boolean =
+ _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.MinMaxCounter, tags ++ defaultTags)).isDefined
+
+ def registerGauge(name: String, valueCollector: CurrentValueCollector, tags: Map[String, String] = Map.empty,
+ unitOfMeasurement: Option[UnitOfMeasurement] = None, dynamicRange: Option[DynamicRange] = None,
+ refreshInterval: Option[FiniteDuration] = None): Gauge = {
+
+ val gaugeEntity = Entity(name, SingleInstrumentEntityRecorder.Gauge, tags ++ defaultTags)
+ val recorder = _trackedEntities.atomicGetOrElseUpdate(gaugeEntity, {
+ val factory = instrumentFactory(gaugeEntity.category)
+ GaugeRecorder(
+ GaugeKey(gaugeEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)),
+ factory.createGauge(name, dynamicRange, refreshInterval, valueCollector)
+ )
+ }, _.cleanup)
+
+ recorder.asInstanceOf[GaugeRecorder].instrument
+ }
+
+ def removeGauge(name: String, tags: Map[String, String]): Boolean =
+ _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.Gauge, tags ++ defaultTags)).isDefined
+
+ def registerCounter(name: String, tags: Map[String, String] = Map.empty, unitOfMeasurement: Option[UnitOfMeasurement] = None,
+ dynamicRange: Option[DynamicRange] = None): Counter = {
+
+ val counterEntity = Entity(name, SingleInstrumentEntityRecorder.Counter, tags ++ defaultTags)
+ val recorder = _trackedEntities.atomicGetOrElseUpdate(counterEntity, {
+ val factory = instrumentFactory(counterEntity.category)
+ CounterRecorder(
+ CounterKey(counterEntity.category, unitOfMeasurement.getOrElse(UnitOfMeasurement.Unknown)),
+ factory.createCounter()
+ )
+ }, _.cleanup)
+
+ recorder.asInstanceOf[CounterRecorder].instrument
+ }
+
+ def removeCounter(name: String, tags: Map[String, String]): Boolean =
+ _trackedEntities.remove(Entity(name, SingleInstrumentEntityRecorder.Counter, tags ++ defaultTags)).isDefined
+
+ def entity[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entity: Entity): T = {
+ _trackedEntities.atomicGetOrElseUpdate(entity.copy(tags = entity.tags ++ defaultTags), {
+ recorderFactory.createRecorder(instrumentFactory(recorderFactory.category))
+ }, _.cleanup).asInstanceOf[T]
+ }
+
+ def removeEntity(entity: Entity): Boolean = {
+ val removedEntity = _trackedEntities.remove(entity.copy(tags = entity.tags ++ defaultTags))
+ removedEntity.foreach(_.cleanup)
+ removedEntity.isDefined
+ }
+
+ def find(entity: Entity): Option[EntityRecorder] =
+ _trackedEntities.get(entity.copy(tags = entity.tags ++ defaultTags))
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit =
+ _subscriptions.tell(Subscribe(filter, subscriber, permanent))
+
+ def unsubscribe(subscriber: ActorRef): Unit =
+ _subscriptions.tell(Unsubscribe(subscriber))
+
+ def buildDefaultCollectionContext: CollectionContext =
+ CollectionContext(settings.defaultCollectionContextBufferSize)
+
+ def instrumentFactory(category: String): InstrumentFactory =
+ settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory)
+
+ private[kamon] def collectSnapshots(collectionContext: CollectionContext): Map[Entity, EntitySnapshot] = {
+ val builder = Map.newBuilder[Entity, EntitySnapshot]
+ _trackedEntities.foreach {
+ case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext)))
+ }
+
+ builder.result()
+ }
+
+ /**
+ * Metrics Extension initialization.
+ */
+ private var _system: ActorSystem = null
+ private lazy val _start = {
+ _subscriptions.point(_system.actorOf(SubscriptionsDispatcher.props(settings.tickInterval, this), "metrics"))
+ settings.pointScheduler(DefaultRefreshScheduler(_system.scheduler, _system.dispatcher))
+ }
+
+ def start(system: ActorSystem, newConfig: Config): Unit = synchronized {
+ settings = MetricsSettings(newConfig)
+ _system = system
+ _start
+ _system = null
+ }
+}
+
+private[kamon] object MetricsModuleImpl {
+
+ def apply(config: Config) =
+ new MetricsModuleImpl(config)
+}
+
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/MetricsSettings.scala b/kamon-core/src/legacy-main/scala/kamon/metric/MetricsSettings.scala
new file mode 100644
index 00000000..592e8f67
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/MetricsSettings.scala
@@ -0,0 +1,123 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import com.typesafe.config.Config
+import kamon.metric.instrument._
+import kamon.util.PathFilter
+import kamon.util.GlobPathFilter
+import kamon.util.RegexPathFilter
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key.
+ */
+case class MetricsSettings(
+ tickInterval: FiniteDuration,
+ defaultCollectionContextBufferSize: Int,
+ trackUnmatchedEntities: Boolean,
+ entityFilters: Map[String, EntityFilter],
+ instrumentFactories: Map[String, InstrumentFactory],
+ defaultInstrumentFactory: InstrumentFactory,
+ refreshScheduler: RefreshScheduler
+) {
+
+ private[kamon] def pointScheduler(targetScheduler: RefreshScheduler): Unit = refreshScheduler match {
+ case lrs: LazyRefreshScheduler ⇒ lrs.point(targetScheduler)
+ case others ⇒
+ }
+}
+
+/**
+ *
+ */
+case class EntityFilter(includes: List[PathFilter], excludes: List[PathFilter]) {
+ def accept(name: String): Boolean =
+ includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
+}
+
+object MetricsSettings {
+ import kamon.util.ConfigTools.Syntax
+
+ def apply(config: Config): MetricsSettings = {
+ val metricConfig = config.getConfig("kamon.metric")
+
+ val tickInterval = metricConfig.getFiniteDuration("tick-interval")
+ val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size")
+ val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities")
+ val entityFilters = loadFilters(metricConfig.getConfig("filters"))
+ val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings"))
+
+ val refreshScheduler = new LazyRefreshScheduler
+ val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler)
+ val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler)
+
+ MetricsSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories,
+ defaultInstrumentFactory, refreshScheduler)
+ }
+
+ /**
+ * Load all the default filters configured under the `kamon.metric.filters` configuration key. All filters are
+ * defined with the entity category as a sub-key of the `kamon.metric.filters` key and two sub-keys to it: includes
+ * and excludes with lists of string glob or regex patterns as values ('asRegex' defaults to false). Example:
+ *
+ * {{{
+ *
+ * kamon.metrics.filters {
+ * actor {
+ * includes = ["user/test-actor", "user/service/worker-*"]
+ * excludes = ["user/IO-*"]
+ * asRegex = false
+ * }
+ * }
+ *
+ * }}}
+ *
+ * @return a Map from category name to corresponding entity filter.
+ */
+ def loadFilters(filtersConfig: Config): Map[String, EntityFilter] = {
+ import scala.collection.JavaConverters._
+
+ filtersConfig.firstLevelKeys map { category: String ⇒
+ val asRegex = if (filtersConfig.hasPath(s"$category.asRegex")) filtersConfig.getBoolean(s"$category.asRegex") else false
+ val includes = filtersConfig.getStringList(s"$category.includes").asScala.map(inc ⇒
+ if (asRegex) RegexPathFilter(inc) else new GlobPathFilter(inc)).toList
+ val excludes = filtersConfig.getStringList(s"$category.excludes").asScala.map(exc ⇒
+ if (asRegex) RegexPathFilter(exc) else new GlobPathFilter(exc)).toList
+
+ (category, EntityFilter(includes, excludes))
+ } toMap
+ }
+
+ /**
+ * Load any custom configuration settings defined under the `kamon.metric.instrument-settings` configuration key and
+ * create InstrumentFactories for them.
+ *
+ * @return a Map from category name to InstrumentFactory.
+ */
+ def loadInstrumentFactories(instrumentSettings: Config, defaults: DefaultInstrumentSettings, refreshScheduler: RefreshScheduler): Map[String, InstrumentFactory] = {
+ instrumentSettings.firstLevelKeys.map { category ⇒
+ val categoryConfig = instrumentSettings.getConfig(category)
+ val customSettings = categoryConfig.firstLevelKeys.map { instrumentName ⇒
+ (instrumentName, InstrumentCustomSettings.fromConfig(categoryConfig.getConfig(instrumentName)))
+ } toMap
+
+ (category, new InstrumentFactory(customSettings, defaults, refreshScheduler))
+ } toMap
+ }
+}
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala
new file mode 100644
index 00000000..09bf58ad
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/SubscriptionsDispatcher.scala
@@ -0,0 +1,116 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor._
+import kamon.metric.SubscriptionsDispatcher._
+import kamon.util.{MilliTimestamp, GlobPathFilter}
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers.
+ */
+private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, metricsExtension: MetricsModuleImpl) extends Actor {
+ var lastTick = MilliTimestamp.now
+ var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher)
+ val collectionContext = metricsExtension.buildDefaultCollectionContext
+
+ def receive = {
+ case Tick ⇒ processTick()
+ case Subscribe(filter, subscriber, permanently) ⇒ subscribe(filter, subscriber, permanently)
+ case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
+ case Terminated(subscriber) ⇒ unsubscribe(subscriber)
+ }
+
+ def processTick(): Unit =
+ dispatch(metricsExtension.collectSnapshots(collectionContext))
+
+ def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = {
+ def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] =
+ storage.updated(subscriber, storage.getOrElse(subscriber, SubscriptionFilter.Empty).combine(filter))
+
+ context.watch(subscriber)
+
+ if (permanent)
+ permanentSubscriptions = addSubscription(permanentSubscriptions)
+ else
+ oneShotSubscriptions = addSubscription(oneShotSubscriptions)
+ }
+
+ def unsubscribe(subscriber: ActorRef): Unit = {
+ permanentSubscriptions = permanentSubscriptions - subscriber
+ oneShotSubscriptions = oneShotSubscriptions - subscriber
+ }
+
+ def dispatch(snapshots: Map[Entity, EntitySnapshot]): Unit = {
+ val currentTick = MilliTimestamp.now
+
+ dispatchSelections(lastTick, currentTick, permanentSubscriptions, snapshots)
+ dispatchSelections(lastTick, currentTick, oneShotSubscriptions, snapshots)
+
+ lastTick = currentTick
+ oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter]
+ }
+
+ def dispatchSelections(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, SubscriptionFilter],
+ snapshots: Map[Entity, EntitySnapshot]): Unit = {
+
+ for ((subscriber, filter) ← subscriptions) {
+ val selection = snapshots.filter(group ⇒ filter.accept(group._1))
+ val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection)
+
+ subscriber ! tickMetrics
+ }
+ }
+}
+
+object SubscriptionsDispatcher {
+ def props(interval: FiniteDuration, metricsExtension: MetricsModuleImpl): Props =
+ Props(new SubscriptionsDispatcher(interval, metricsExtension))
+
+ case object Tick
+ case class Unsubscribe(subscriber: ActorRef)
+ case class Subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean = false)
+ case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[Entity, EntitySnapshot])
+
+}
+
+trait SubscriptionFilter { self ⇒
+
+ def accept(entity: Entity): Boolean
+
+ final def combine(that: SubscriptionFilter): SubscriptionFilter = new SubscriptionFilter {
+ override def accept(entity: Entity): Boolean = self.accept(entity) || that.accept(entity)
+ }
+}
+
+object SubscriptionFilter {
+ val Empty = new SubscriptionFilter {
+ def accept(entity: Entity): Boolean = false
+ }
+
+ def apply(category: String, name: String): SubscriptionFilter = new SubscriptionFilter {
+ val categoryPattern = new GlobPathFilter(category)
+ val namePattern = new GlobPathFilter(name)
+
+ def accept(entity: Entity): Boolean = {
+ categoryPattern.accept(entity.category) && namePattern.accept(entity.name)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/legacy-main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
new file mode 100644
index 00000000..22557974
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/TickMetricSnapshotBuffer.scala
@@ -0,0 +1,65 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import akka.actor.{Props, Actor, ActorRef}
+import kamon.Kamon
+import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
+import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer
+import kamon.metric.instrument.CollectionContext
+import kamon.util.MapMerge
+
+import scala.concurrent.duration.FiniteDuration
+
+class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor {
+ import MapMerge.Syntax
+
+ val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher)
+ val collectionContext: CollectionContext = Kamon.metrics.buildDefaultCollectionContext
+
+ def receive = empty
+
+ def empty: Actor.Receive = {
+ case tick: TickMetricSnapshot ⇒ context become (buffering(tick))
+ case FlushBuffer ⇒ // Nothing to flush.
+ }
+
+ def buffering(buffered: TickMetricSnapshot): Actor.Receive = {
+ case TickMetricSnapshot(_, to, tickMetrics) ⇒
+ val combinedMetrics = buffered.metrics.merge(tickMetrics, (l, r) ⇒ l.merge(r, collectionContext))
+ val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics)
+
+ context become (buffering(combinedSnapshot))
+
+ case FlushBuffer ⇒
+ receiver ! buffered
+ context become (empty)
+
+ }
+
+ override def postStop(): Unit = {
+ flushSchedule.cancel()
+ super.postStop()
+ }
+}
+
+object TickMetricSnapshotBuffer {
+ case object FlushBuffer
+
+ def props(flushInterval: FiniteDuration, receiver: ActorRef): Props =
+ Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver))
+}
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/legacy-main/scala/kamon/metric/TraceMetrics.scala
new file mode 100644
index 00000000..eaeebb97
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/TraceMetrics.scala
@@ -0,0 +1,53 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2016 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric
+
+import kamon.metric.instrument.{Time, InstrumentFactory}
+
+class TraceMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+
+ /**
+ * Records blah blah
+ */
+ val elapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds)
+ val errors = counter("errors")
+}
+
+object TraceMetrics extends EntityRecorderFactory[TraceMetrics] {
+ def category: String = "trace"
+ def createRecorder(instrumentFactory: InstrumentFactory): TraceMetrics = new TraceMetrics(instrumentFactory)
+
+ // Java API.
+ def factory: EntityRecorderFactory[TraceMetrics] = this
+}
+
+class SegmentMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+
+ /**
+ * Records blah blah
+ */
+ val elapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds)
+ val errors = counter("errors")
+}
+
+object SegmentMetrics extends EntityRecorderFactory[SegmentMetrics] {
+ def category: String = "trace-segment"
+ def createRecorder(instrumentFactory: InstrumentFactory): SegmentMetrics = new SegmentMetrics(instrumentFactory)
+
+ // Java API.
+ def factory: EntityRecorderFactory[SegmentMetrics] = this
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Counter.scala
new file mode 100644
index 00000000..b7ab60de
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Counter.scala
@@ -0,0 +1,65 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import kamon.jsr166.LongAdder
+
+trait Counter extends Instrument {
+ type SnapshotType = Counter.Snapshot
+
+ def increment(): Unit
+ def increment(times: Long): Unit
+}
+
+object Counter {
+
+ def apply(): Counter = new LongAdderCounter
+ def create(): Counter = apply()
+
+ trait Snapshot extends InstrumentSnapshot {
+ def count: Long
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot
+ def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot
+ }
+}
+
+class LongAdderCounter extends Counter {
+ private val counter = new LongAdder
+
+ def increment(): Unit = counter.increment()
+
+ def increment(times: Long): Unit = {
+ if (times < 0)
+ throw new UnsupportedOperationException("Counters cannot be decremented")
+ counter.add(times)
+ }
+
+ def collect(context: CollectionContext): Counter.Snapshot = CounterSnapshot(counter.sumAndReset())
+
+ def cleanup: Unit = {}
+}
+
+case class CounterSnapshot(count: Long) extends Counter.Snapshot {
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot = that match {
+ case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount)
+ case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.")
+ }
+
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Counter.Snapshot =
+ CounterSnapshot(from.tryScale(to)(count).toLong)
+
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Gauge.scala
new file mode 100644
index 00000000..39571d3d
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Gauge.scala
@@ -0,0 +1,120 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import java.util.concurrent.atomic.{AtomicLong, AtomicLongFieldUpdater, AtomicReference}
+
+import akka.actor.Cancellable
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+
+import scala.concurrent.duration.FiniteDuration
+
+trait Gauge extends Instrument {
+ type SnapshotType = Histogram.Snapshot
+
+ def record(value: Long): Unit
+ def record(value: Long, count: Long): Unit
+ def refreshValue(): Unit
+}
+
+object Gauge {
+
+ def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = {
+ val underlyingHistogram = Histogram(dynamicRange)
+ val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector)
+ val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ {
+ gauge.refreshValue()
+ })
+
+ gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule)
+ gauge
+ }
+
+ def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge =
+ apply(dynamicRange, refreshInterval, scheduler, valueCollector)
+
+ trait CurrentValueCollector {
+ def currentValue: Long
+ }
+
+ implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f.apply()
+ }
+
+ implicit def callByNameLongAsCurrentValueCollector(f: ⇒ Long): CurrentValueCollector = new CurrentValueCollector {
+ def currentValue: Long = f
+ }
+}
+
+/**
+ * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between
+ * the current observed value and the previously observed value. Should only be used if the observed value is always
+ * increasing or staying steady, but is never able to decrease.
+ *
+ * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between
+ * the current value and the last value will be returned.
+ */
+class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector {
+ @volatile private var _readAtLeastOnce = false
+ private val _lastObservedValue = new AtomicLong(0)
+
+ def currentValue: Long = {
+ if (_readAtLeastOnce) {
+ val wrappedCurrent = wrappedValueCollector.currentValue
+ val diff = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent)
+
+ if (diff >= 0) diff else 0L
+
+ } else {
+ _lastObservedValue.set(wrappedValueCollector.currentValue)
+ _readAtLeastOnce = true
+ 0L
+ }
+
+ }
+}
+
+object DifferentialValueCollector {
+ def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector =
+ new DifferentialValueCollector(wrappedValueCollector)
+
+ def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector =
+ new DifferentialValueCollector(new CurrentValueCollector {
+ def currentValue: Long = wrappedValueCollector
+ })
+}
+
+class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge {
+ private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]()
+
+ def record(value: Long): Unit = underlyingHistogram.record(value)
+
+ def record(value: Long, count: Long): Unit = underlyingHistogram.record(value, count)
+
+ def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context)
+
+ def cleanup: Unit = {
+ if (automaticValueCollectorSchedule.get() != null)
+ automaticValueCollectorSchedule.get().cancel()
+ }
+
+ def refreshValue(): Unit =
+ underlyingHistogram.record(currentValueCollector.currentValue)
+
+}
+
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Histogram.scala
new file mode 100644
index 00000000..399f0880
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Histogram.scala
@@ -0,0 +1,331 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+
+import kamon.metric.instrument.Histogram.{DynamicRange, Snapshot}
+import kamon.util.logger.LazyLogger
+import org.HdrHistogram.ModifiedAtomicHistogram
+
+trait Histogram extends Instrument {
+ type SnapshotType = Histogram.Snapshot
+
+ def record(value: Long): Unit
+ def record(value: Long, count: Long): Unit
+}
+
+object Histogram {
+
+ /**
+ * Scala API:
+ *
+ * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given
+ * [[kamon.metric.instrument.Histogram.DynamicRange]].
+ */
+ def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange)
+
+ /**
+ * Java API:
+ *
+ * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given
+ * [[kamon.metric.instrument.Histogram.DynamicRange]].
+ */
+ def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange)
+
+ /**
+ * DynamicRange is a configuration object used to supply range and precision configuration to a
+ * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]]
+ * for more details on how it works and the effects of these configuration values.
+ *
+ * @param lowestDiscernibleValue
+ * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that
+ * is >= 1. May be internally rounded down to nearest power of 2.
+ * @param highestTrackableValue
+ * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue).
+ * Must not be larger than (Long.MAX_VALUE/2).
+ * @param precision
+ * The number of significant decimal digits to which the histogram will maintain value resolution and separation.
+ * Must be a non-negative integer between 1 and 3.
+ */
+ case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int)
+
+ trait Record {
+ def level: Long
+ def count: Long
+
+ private[kamon] def rawCompactRecord: Long
+ }
+
+ case class MutableRecord(var level: Long, var count: Long) extends Record {
+ var rawCompactRecord: Long = 0L
+ }
+
+ trait Snapshot extends InstrumentSnapshot {
+
+ def isEmpty: Boolean = numberOfMeasurements == 0
+ def numberOfMeasurements: Long
+ def min: Long
+ def max: Long
+ def sum: Long
+ def percentile(percentile: Double): Long
+ def recordsIterator: Iterator[Record]
+ def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot
+
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot =
+ new ScaledSnapshot(from, to, this)
+ }
+
+ class ScaledSnapshot(from: UnitOfMeasurement, to: UnitOfMeasurement, snapshot: Snapshot) extends Snapshot {
+ private def doScale(v: Long) = from.tryScale(to)(v).toLong
+ override def numberOfMeasurements: Long = snapshot.numberOfMeasurements
+
+ override def max: Long = doScale(snapshot.max)
+
+ override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = snapshot.merge(that, context)
+
+ override def merge(that: Snapshot, context: CollectionContext): Snapshot = snapshot.merge(that, context)
+
+ override def percentile(percentile: Double): Long = doScale(snapshot.percentile(percentile))
+
+ override def min: Long = doScale(snapshot.min)
+
+ override def sum: Long = doScale(snapshot.sum)
+
+ override def recordsIterator: Iterator[Record] = {
+ snapshot.recordsIterator.map(record ⇒ new Record {
+ override def count: Long = record.count
+
+ override def level: Long = doScale(record.level)
+
+ override private[kamon] def rawCompactRecord: Long = record.rawCompactRecord
+ })
+ }
+
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot =
+ if (this.from == from && this.to == to) this else super.scale(from, to)
+ }
+
+ object Snapshot {
+ val empty = new Snapshot {
+ override def min: Long = 0L
+ override def max: Long = 0L
+ override def sum: Long = 0L
+ override def percentile(percentile: Double): Long = 0L
+ override def recordsIterator: Iterator[Record] = Iterator.empty
+ override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that
+ override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that
+ override def numberOfMeasurements: Long = 0L
+ override def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): Histogram.Snapshot = this
+ }
+ }
+}
+
+object HdrHistogram {
+ private val log = LazyLogger(classOf[HdrHistogram])
+}
+
+/**
+ * This implementation is meant to be used for real time data collection where data snapshots are taken often over time.
+ * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still
+ * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken.
+ */
+class HdrHistogram(dynamicRange: DynamicRange) extends ModifiedAtomicHistogram(
+ dynamicRange.lowestDiscernibleValue,
+ dynamicRange.highestTrackableValue, dynamicRange.precision
+) with Histogram {
+ import HdrHistogram.log
+
+ def record(value: Long): Unit = tryRecord(value, 1L)
+
+ def record(value: Long, count: Long): Unit = tryRecord(value, count)
+
+ private def tryRecord(value: Long, count: Long): Unit = {
+ try {
+ recordValueWithCount(value, count)
+ } catch {
+ case anyException: Throwable ⇒
+ log.warn(s"Failed to store value $value in HdrHistogram, please review your range configuration.", anyException)
+ }
+ }
+
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ import context.buffer
+ buffer.clear()
+ val nrOfMeasurements = writeSnapshotTo(buffer)
+
+ buffer.flip()
+
+ val measurementsArray = Array.ofDim[Long](buffer.limit())
+ buffer.get(measurementsArray, 0, measurementsArray.length)
+ new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude())
+ }
+
+ def getCounts = countsArray().length()
+
+ def cleanup: Unit = {}
+
+ private def writeSnapshotTo(buffer: LongBuffer): Long = {
+ val counts = countsArray()
+ val countsLength = counts.length()
+
+ var nrOfMeasurements = 0L
+ var index = 0L
+ while (index < countsLength) {
+ val countAtIndex = counts.getAndSet(index.toInt, 0L)
+
+ if (countAtIndex > 0) {
+ buffer.put(CompactHdrSnapshot.compactRecord(index, countAtIndex))
+ nrOfMeasurements += countAtIndex
+ }
+
+ index += 1
+ }
+ nrOfMeasurements
+ }
+}
+
+case class CompactHdrSnapshot(numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int,
+ subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot {
+
+ def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0))
+ def max: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(compactRecords.length - 1))
+ def sum: Long = recordsIterator.foldLeft(0L)((a, r) ⇒ a + (r.count * r.level))
+
+ def percentile(p: Double): Long = {
+ val records = recordsIterator
+ val threshold = numberOfMeasurements * (p / 100D)
+ var countToCurrentLevel = 0L
+ var percentileLevel = 0L
+
+ while (countToCurrentLevel < threshold && records.hasNext) {
+ val record = records.next()
+ countToCurrentLevel += record.count
+ percentileLevel = record.level
+ }
+
+ percentileLevel
+ }
+
+ def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot =
+ merge(that.asInstanceOf[InstrumentSnapshot], context)
+
+ def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match {
+ case thatSnapshot: CompactHdrSnapshot ⇒
+ if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else {
+ import context.buffer
+ buffer.clear()
+
+ val selfIterator = recordsIterator
+ val thatIterator = thatSnapshot.recordsIterator
+ var thatCurrentRecord: Histogram.Record = null
+ var mergedNumberOfMeasurements = 0L
+
+ def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null
+ def addToBuffer(compactRecord: Long): Unit = {
+ mergedNumberOfMeasurements += countFromCompactRecord(compactRecord)
+ buffer.put(compactRecord)
+ }
+
+ while (selfIterator.hasNext) {
+ val selfCurrentRecord = selfIterator.next()
+
+ // Advance that to no further than the level of selfCurrentRecord
+ thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord
+ while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) {
+ addToBuffer(thatCurrentRecord.rawCompactRecord)
+ thatCurrentRecord = nextOrNull(thatIterator)
+ }
+
+ // Include the current record of self and optionally merge if has the same level as thatCurrentRecord
+ if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) {
+ addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord))
+ thatCurrentRecord = nextOrNull(thatIterator)
+ } else {
+ addToBuffer(selfCurrentRecord.rawCompactRecord)
+ }
+ }
+
+ // Include everything that might have been left from that
+ if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord)
+ while (thatIterator.hasNext) {
+ addToBuffer(thatIterator.next().rawCompactRecord)
+ }
+
+ buffer.flip()
+ val compactRecords = Array.ofDim[Long](buffer.limit())
+ buffer.get(compactRecords)
+
+ new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude)
+ }
+
+ case other ⇒
+ sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.")
+
+ }
+
+ @inline private def mergeCompactRecords(left: Long, right: Long): Long = {
+ val index = left >> 48
+ val leftCount = countFromCompactRecord(left)
+ val rightCount = countFromCompactRecord(right)
+
+ CompactHdrSnapshot.compactRecord(index, leftCount + rightCount)
+ }
+
+ @inline private def levelFromCompactRecord(compactRecord: Long): Long = {
+ val countsArrayIndex = (compactRecord >> 48).toInt
+ var bucketIndex: Int = (countsArrayIndex >> subBucketHalfCountMagnitude) - 1
+ var subBucketIndex: Int = (countsArrayIndex & (subBucketHalfCount - 1)) + subBucketHalfCount
+ if (bucketIndex < 0) {
+ subBucketIndex -= subBucketHalfCount
+ bucketIndex = 0
+ }
+
+ subBucketIndex.toLong << (bucketIndex + unitMagnitude)
+ }
+
+ @inline private def countFromCompactRecord(compactRecord: Long): Long =
+ compactRecord & CompactHdrSnapshot.CompactRecordCountMask
+
+ def recordsIterator: Iterator[Histogram.Record] = new Iterator[Histogram.Record] {
+ var currentIndex = 0
+ val mutableRecord = Histogram.MutableRecord(0, 0)
+
+ override def hasNext: Boolean = currentIndex < compactRecords.length
+
+ override def next(): Histogram.Record = {
+ if (hasNext) {
+ val measurement = compactRecords(currentIndex)
+ mutableRecord.rawCompactRecord = measurement
+ mutableRecord.level = levelFromCompactRecord(measurement)
+ mutableRecord.count = countFromCompactRecord(measurement)
+ currentIndex += 1
+
+ mutableRecord
+ } else {
+ throw new IllegalStateException("The iterator has already been consumed.")
+ }
+ }
+ }
+}
+
+object CompactHdrSnapshot {
+ val CompactRecordCountMask = 0xFFFFFFFFFFFFL
+
+ def compactRecord(index: Long, count: Long): Long = (index << 48) | count
+}
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Instrument.scala
new file mode 100644
index 00000000..2c4b4319
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/Instrument.scala
@@ -0,0 +1,51 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import java.nio.LongBuffer
+
+private[kamon] trait Instrument {
+ type SnapshotType <: InstrumentSnapshot
+
+ def collect(context: CollectionContext): SnapshotType
+ def cleanup: Unit
+}
+
+trait InstrumentSnapshot {
+ def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot
+
+ def scale(from: UnitOfMeasurement, to: UnitOfMeasurement): InstrumentSnapshot
+}
+
+trait CollectionContext {
+ def buffer: LongBuffer
+}
+
+object CollectionContext {
+ def apply(longBufferSize: Int): CollectionContext = new CollectionContext {
+ val buffer: LongBuffer = LongBuffer.allocate(longBufferSize)
+ }
+}
+
+sealed trait InstrumentType
+
+object InstrumentTypes {
+ case object Histogram extends InstrumentType
+ case object MinMaxCounter extends InstrumentType
+ case object Gauge extends InstrumentType
+ case object Counter extends InstrumentType
+}
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentFactory.scala
new file mode 100644
index 00000000..7c0201f7
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentFactory.scala
@@ -0,0 +1,51 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import kamon.metric.instrument.Gauge.CurrentValueCollector
+import kamon.metric.instrument.Histogram.DynamicRange
+
+import scala.concurrent.duration.FiniteDuration
+
+case class InstrumentFactory(configurations: Map[String, InstrumentCustomSettings], defaults: DefaultInstrumentSettings, scheduler: RefreshScheduler) {
+
+ private def resolveSettings(instrumentName: String, codeSettings: Option[InstrumentSettings], default: InstrumentSettings): InstrumentSettings = {
+ configurations.get(instrumentName).flatMap { customSettings ⇒
+ codeSettings.map(cs ⇒ customSettings.combine(cs)) orElse (Some(customSettings.combine(default)))
+
+ } getOrElse (codeSettings.getOrElse(default))
+ }
+
+ def createHistogram(name: String, dynamicRange: Option[DynamicRange] = None): Histogram = {
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, None)), defaults.histogram)
+ Histogram(settings.dynamicRange)
+ }
+
+ def createMinMaxCounter(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter = {
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.minMaxCounter)
+ MinMaxCounter(settings.dynamicRange, settings.refreshInterval.get, scheduler)
+ }
+
+ def createGauge(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None,
+ valueCollector: CurrentValueCollector): Gauge = {
+
+ val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.gauge)
+ Gauge(settings.dynamicRange, settings.refreshInterval.get, scheduler, valueCollector)
+ }
+
+ def createCounter(): Counter = Counter()
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentSettings.scala
new file mode 100644
index 00000000..e4d6f547
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/InstrumentSettings.scala
@@ -0,0 +1,73 @@
+package kamon.metric.instrument
+
+import com.typesafe.config.Config
+import kamon.metric.instrument.Histogram.DynamicRange
+
+import scala.concurrent.duration.FiniteDuration
+
+case class InstrumentCustomSettings(lowestDiscernibleValue: Option[Long], highestTrackableValue: Option[Long],
+ precision: Option[Int], refreshInterval: Option[FiniteDuration]) {
+
+ def combine(that: InstrumentSettings): InstrumentSettings =
+ InstrumentSettings(
+ DynamicRange(
+ lowestDiscernibleValue.getOrElse(that.dynamicRange.lowestDiscernibleValue),
+ highestTrackableValue.getOrElse(that.dynamicRange.highestTrackableValue),
+ precision.getOrElse(that.dynamicRange.precision)
+ ),
+ refreshInterval.orElse(that.refreshInterval)
+ )
+}
+
+object InstrumentCustomSettings {
+ import kamon.util.ConfigTools.Syntax
+
+ def fromConfig(config: Config): InstrumentCustomSettings =
+ InstrumentCustomSettings(
+ if (config.hasPath("lowest-discernible-value")) Some(config.getLong("lowest-discernible-value")) else None,
+ if (config.hasPath("highest-trackable-value")) Some(config.getLong("highest-trackable-value")) else None,
+ if (config.hasPath("precision")) Some(InstrumentSettings.parsePrecision(config.getString("precision"))) else None,
+ if (config.hasPath("refresh-interval")) Some(config.getFiniteDuration("refresh-interval")) else None
+ )
+
+}
+
+case class InstrumentSettings(dynamicRange: DynamicRange, refreshInterval: Option[FiniteDuration])
+
+object InstrumentSettings {
+
+ def readDynamicRange(config: Config): DynamicRange =
+ DynamicRange(
+ config.getLong("lowest-discernible-value"),
+ config.getLong("highest-trackable-value"),
+ parsePrecision(config.getString("precision"))
+ )
+
+ def parsePrecision(stringValue: String): Int = stringValue match {
+ case "low" ⇒ 1
+ case "normal" ⇒ 2
+ case "fine" ⇒ 3
+ case other ⇒ sys.error(s"Invalid precision configuration [$other] found, valid options are: [low|normal|fine].")
+ }
+}
+
+case class DefaultInstrumentSettings(histogram: InstrumentSettings, minMaxCounter: InstrumentSettings, gauge: InstrumentSettings)
+
+object DefaultInstrumentSettings {
+
+ def fromConfig(config: Config): DefaultInstrumentSettings = {
+ import kamon.util.ConfigTools.Syntax
+
+ val histogramSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("histogram")), None)
+ val minMaxCounterSettings = InstrumentSettings(
+ InstrumentSettings.readDynamicRange(config.getConfig("min-max-counter")),
+ Some(config.getFiniteDuration("min-max-counter.refresh-interval"))
+ )
+ val gaugeSettings = InstrumentSettings(
+ InstrumentSettings.readDynamicRange(config.getConfig("gauge")),
+ Some(config.getFiniteDuration("gauge.refresh-interval"))
+ )
+
+ DefaultInstrumentSettings(histogramSettings, minMaxCounterSettings, gaugeSettings)
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/MinMaxCounter.scala
new file mode 100644
index 00000000..76fc2c2a
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/MinMaxCounter.scala
@@ -0,0 +1,105 @@
+package kamon.metric.instrument
+
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+import java.lang.Math.abs
+import java.util.concurrent.atomic.AtomicReference
+import akka.actor.Cancellable
+import kamon.jsr166.LongMaxUpdater
+import kamon.metric.instrument.Histogram.DynamicRange
+import kamon.util.PaddedAtomicLong
+import scala.concurrent.duration.FiniteDuration
+
+trait MinMaxCounter extends Instrument {
+ override type SnapshotType = Histogram.Snapshot
+
+ def increment(): Unit
+ def increment(times: Long): Unit
+ def decrement(): Unit
+ def decrement(times: Long): Unit
+ def refreshValues(): Unit
+}
+
+object MinMaxCounter {
+
+ def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = {
+ val underlyingHistogram = Histogram(dynamicRange)
+ val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram)
+ val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ {
+ minMaxCounter.refreshValues()
+ })
+
+ minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule)
+ minMaxCounter
+ }
+
+ def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter =
+ apply(dynamicRange, refreshInterval, scheduler)
+
+}
+
+class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter {
+ private val min = new LongMaxUpdater(0L)
+ private val max = new LongMaxUpdater(0L)
+ private val sum = new PaddedAtomicLong
+ val refreshValuesSchedule = new AtomicReference[Cancellable]()
+
+ def increment(): Unit = increment(1L)
+
+ def increment(times: Long): Unit = {
+ val currentValue = sum.addAndGet(times)
+ max.update(currentValue)
+ }
+
+ def decrement(): Unit = decrement(1L)
+
+ def decrement(times: Long): Unit = {
+ val currentValue = sum.addAndGet(-times)
+ min.update(-currentValue)
+ }
+
+ def collect(context: CollectionContext): Histogram.Snapshot = {
+ refreshValues()
+ underlyingHistogram.collect(context)
+ }
+
+ def cleanup: Unit = {
+ if (refreshValuesSchedule.get() != null)
+ refreshValuesSchedule.get().cancel()
+ }
+
+ def refreshValues(): Unit = {
+ val currentValue = {
+ val value = sum.get()
+ if (value <= 0) 0 else value
+ }
+
+ val currentMin = {
+ val rawMin = min.maxThenReset(-currentValue)
+ if (rawMin >= 0)
+ 0
+ else
+ abs(rawMin)
+ }
+
+ val currentMax = max.maxThenReset(currentValue)
+
+ underlyingHistogram.record(currentValue)
+ underlyingHistogram.record(currentMin)
+ underlyingHistogram.record(currentMax)
+ }
+}
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala
new file mode 100644
index 00000000..eb01d114
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/ModifiedAtomicHistogram.scala
@@ -0,0 +1,31 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package org.HdrHistogram
+
+import java.util.concurrent.atomic.AtomicLongArray
+
+abstract class ModifiedAtomicHistogram(low: Long, high: Long, precision: Int)
+ extends AtomicHistogram(low, high, precision) { self ⇒
+
+ override def incrementTotalCount(): Unit = {}
+ override def addToTotalCount(value: Long): Unit = {}
+
+ def countsArray(): AtomicLongArray = counts
+ def protectedUnitMagnitude(): Int = unitMagnitude
+ def protectedSubBucketHalfCount(): Int = subBucketHalfCount
+ def protectedSubBucketHalfCountMagnitude(): Int = subBucketHalfCountMagnitude
+} \ No newline at end of file
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/RefreshScheduler.scala
new file mode 100644
index 00000000..6bc02dc3
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/RefreshScheduler.scala
@@ -0,0 +1,115 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+import akka.actor.{Scheduler, Cancellable}
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+trait RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
+}
+
+/**
+ * Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run
+ * in the provided ExecutionContext.
+ */
+class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
+ scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
+}
+
+object DefaultRefreshScheduler {
+ def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ new DefaultRefreshScheduler(scheduler, dispatcher)
+
+ def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ apply(scheduler, dispatcher)
+}
+
+/**
+ * RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh
+ * scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed
+ * scheduler.
+ */
+class LazyRefreshScheduler extends RefreshScheduler {
+ private val _schedulerPhaser = new WriterReaderPhaser
+ private val _backlog = new TrieMap[(FiniteDuration, () ⇒ Unit), RepointableCancellable]()
+ @volatile private var _target: Option[RefreshScheduler] = None
+
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = {
+ val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map { scheduler ⇒
+ scheduler.schedule(interval, refresh)
+
+ } getOrElse {
+ val entry = (interval, refresh)
+ val cancellable = new RepointableCancellable(entry)
+
+ _backlog.put(entry, cancellable)
+ cancellable
+ }
+
+ } finally {
+ _schedulerPhaser.writerCriticalSectionExit(criticalEnter)
+ }
+ }
+
+ def point(target: RefreshScheduler): Unit = try {
+ _schedulerPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _schedulerPhaser.flipPhase(10000L)
+ _backlog.dropWhile {
+ case ((interval, refresh), repointableCancellable) ⇒
+ repointableCancellable.point(target.schedule(interval, refresh))
+ true
+ }
+ } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.")
+ } finally { _schedulerPhaser.readerUnlock() }
+
+ class RepointableCancellable(entry: (FiniteDuration, () ⇒ Unit)) extends Cancellable {
+ private var _isCancelled = false
+ private var _cancellable: Option[Cancellable] = None
+
+ def isCancelled: Boolean = synchronized {
+ _cancellable.map(_.isCancelled).getOrElse(_isCancelled)
+ }
+
+ def cancel(): Boolean = synchronized {
+ _isCancelled = true
+ _cancellable.map(_.cancel()).getOrElse(_backlog.remove(entry).nonEmpty)
+ }
+
+ def point(cancellable: Cancellable): Unit = synchronized {
+ if (_cancellable.isEmpty) {
+ _cancellable = Some(cancellable)
+
+ if (_isCancelled)
+ cancellable.cancel()
+
+ } else sys.error("A RepointableCancellable cannot be pointed more than once.")
+
+ }
+ }
+}
+
diff --git a/kamon-core/src/legacy-main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
new file mode 100644
index 00000000..5952b906
--- /dev/null
+++ b/kamon-core/src/legacy-main/scala/kamon/metric/instrument/UnitOfMeasurement.scala
@@ -0,0 +1,109 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2015 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.metric.instrument
+
+/**
+ * A UnitOfMeasurement implementation describes the magnitude of a quantity being measured, such as Time and computer
+ * Memory space. Kamon uses UnitOfMeasurement implementations just as a informative companion to metrics inside entity
+ * recorders and might be used to scale certain kinds of measurements in metric backends.
+ */
+trait UnitOfMeasurement {
+ type U <: UnitOfMeasurement
+
+ def name: String
+ def label: String
+ def scale(toUnit: U)(value: Double): Double = value
+
+ def tryScale(toUnit: UnitOfMeasurement)(value: Double): Double =
+ if (canScale(toUnit)) scale(toUnit.asInstanceOf[U])(value)
+ else throw new IllegalArgumentException(s"Can't scale different types of units `$name` and `${toUnit.name}`")
+
+ protected def canScale(toUnit: UnitOfMeasurement): Boolean
+
+}
+
+object UnitOfMeasurement {
+ case object Unknown extends UnitOfMeasurement {
+ override type U = Unknown.type
+ val name = "unknown"
+ val label = "unknown"
+
+ override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isUnknown(toUnit)
+ }
+
+ def isUnknown(uom: UnitOfMeasurement): Boolean =
+ uom == Unknown
+
+ def isTime(uom: UnitOfMeasurement): Boolean =
+ uom.isInstanceOf[Time]
+
+ def isMemory(uom: UnitOfMeasurement): Boolean =
+ uom.isInstanceOf[Memory]
+
+}
+
+/**
+ * UnitOfMeasurement representing time.
+ */
+case class Time(factor: Double, label: String) extends UnitOfMeasurement {
+ override type U = Time
+ val name = "time"
+
+ override def scale(toUnit: Time)(value: Double): Double =
+ (value * factor) / toUnit.factor
+
+ override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isTime(toUnit)
+}
+
+object Time {
+ val Nanoseconds = Time(1E-9, "n")
+ val Microseconds = Time(1E-6, "µs")
+ val Milliseconds = Time(1E-3, "ms")
+ val Seconds = Time(1, "s")
+
+ val units = List(Nanoseconds, Microseconds, Milliseconds, Seconds)
+
+ def apply(time: String): Time = units.find(_.label.toLowerCase == time.toLowerCase) getOrElse {
+ throw new IllegalArgumentException(s"Can't recognize time unit '$time'")
+ }
+}
+
+/**
+ * UnitOfMeasurement representing computer memory space.
+ */
+case class Memory(factor: Double, label: String) extends UnitOfMeasurement {
+ override type U = Memory
+ val name = "bytes"
+
+ override def scale(toUnit: Memory)(value: Double): Double =
+ (value * factor) / toUnit.factor
+
+ override protected def canScale(toUnit: UnitOfMeasurement): Boolean = UnitOfMeasurement.isMemory(toUnit)
+}
+
+object Memory {
+ val Bytes = Memory(1, "b")
+ val KiloBytes = Memory(1024, "Kb")
+ val MegaBytes = Memory(1024 * 1024, "Mb")
+ val GigaBytes = Memory(1024 * 1024 * 1024, "Gb")
+
+ val units = List(Bytes, KiloBytes, MegaBytes, GigaBytes)
+
+ def apply(memory: String): Memory = units.find(_.label.toLowerCase == memory.toLowerCase) getOrElse {
+ throw new IllegalArgumentException(s"Can't recognize memory unit '$memory'")
+ }
+}