diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric')
22 files changed, 1321 insertions, 707 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/Entity.scala b/kamon-core/src/main/scala/kamon/metric/Entity.scala new file mode 100644 index 00000000..962626e0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Entity.scala @@ -0,0 +1,52 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +/** + * Identify a `thing` that is being monitored by Kamon. A [[kamon.metric.Entity]] is used to identify tracked `things` + * in both the metrics recording and reporting sides. Only the name and category fields are used with determining + * equality between two entities. + * + * // TODO: Find a better word for `thing`. + */ +class Entity(val name: String, val category: String, val metadata: Map[String, String]) { + + override def equals(o: Any): Boolean = { + if (this eq o.asInstanceOf[AnyRef]) + true + else if ((o.asInstanceOf[AnyRef] eq null) || !o.isInstanceOf[Entity]) + false + else { + val thatAsEntity = o.asInstanceOf[Entity] + category == thatAsEntity.category && name == thatAsEntity.name + } + } + + override def hashCode: Int = { + var result: Int = name.hashCode + result = 31 * result + category.hashCode + return result + } +} + +object Entity { + def apply(name: String, category: String): Entity = + apply(name, category, Map.empty) + + def apply(name: String, category: String, metadata: Map[String, String]): Entity = + new Entity(name, category, metadata) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala b/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala deleted file mode 100644 index 3761f5a5..00000000 --- a/kamon-core/src/main/scala/kamon/metric/EntityMetrics.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import java.nio.{ LongBuffer } -import akka.actor.ActorSystem -import com.typesafe.config.Config - -trait MetricGroupCategory { - def name: String -} - -trait MetricGroupIdentity { - def name: String - def category: MetricGroupCategory -} - -trait MetricIdentity { - def name: String -} - -trait CollectionContext { - def buffer: LongBuffer -} - -object CollectionContext { - def apply(longBufferSize: Int): CollectionContext = new CollectionContext { - val buffer: LongBuffer = LongBuffer.allocate(longBufferSize) - } -} - -trait MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot - def cleanup: Unit -} - -trait MetricSnapshot { - type SnapshotType - - def merge(that: SnapshotType, context: CollectionContext): SnapshotType -} - -trait MetricGroupSnapshot { - type GroupSnapshotType - - def metrics: Map[MetricIdentity, MetricSnapshot] - def merge(that: GroupSnapshotType, context: CollectionContext): GroupSnapshotType -} - -private[kamon] trait MetricRecorder { - type SnapshotType <: MetricSnapshot - - def collect(context: CollectionContext): SnapshotType - def cleanup: Unit -} - -trait MetricGroupFactory { - type GroupRecorder <: MetricGroupRecorder - def create(config: Config, system: ActorSystem): GroupRecorder -} - diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala new file mode 100644 index 00000000..7a1972f0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -0,0 +1,157 @@ +package kamon.metric + +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.metric.instrument._ + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.FiniteDuration + +trait EntityRecorder { + def collect(collectionContext: CollectionContext): EntitySnapshot + def cleanup: Unit +} + +trait EntityRecorderFactory[T <: EntityRecorder] { + def category: String + def createRecorder(instrumentFactory: InstrumentFactory): T +} + +abstract class GenericEntityRecorder(instrumentFactory: InstrumentFactory) extends EntityRecorder { + import kamon.util.TriemapAtomicGetOrElseUpdate.Syntax + + private val _instruments = TrieMap.empty[MetricKey, Instrument] + private def register[T <: Instrument](key: MetricKey, instrument: ⇒ T): T = + _instruments.atomicGetOrElseUpdate(key, instrument).asInstanceOf[T] + + protected def histogram(name: String): Histogram = + register(HistogramKey(name), instrumentFactory.createHistogram(name)) + + protected def histogram(name: String, dynamicRange: DynamicRange): Histogram = + register(HistogramKey(name), instrumentFactory.createHistogram(name, Some(dynamicRange))) + + protected def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = + register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name)) + + protected def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = + register(HistogramKey(name, unitOfMeasurement), instrumentFactory.createHistogram(name, Some(dynamicRange))) + + protected def histogram(key: HistogramKey): Histogram = + register(key, instrumentFactory.createHistogram(key.name)) + + protected def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = + register(key, instrumentFactory.createHistogram(key.name, Some(dynamicRange))) + + protected def removeHistogram(name: String): Unit = + _instruments.remove(HistogramKey(name)) + + protected def removeHistogram(key: HistogramKey): Unit = + _instruments.remove(key) + + protected def minMaxCounter(name: String): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name)) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) + + protected def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name)) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange))) + + protected def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name, unitOfMeasurement), instrumentFactory.createMinMaxCounter(name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + register(MinMaxCounterKey(name), instrumentFactory.createMinMaxCounter(name, Some(dynamicRange), Some(refreshInterval))) + + protected def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name)) + + protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange))) + + protected def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, refreshInterval = Some(refreshInterval))) + + protected def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + register(key, instrumentFactory.createMinMaxCounter(key.name, Some(dynamicRange), Some(refreshInterval))) + + protected def removeMinMaxCounter(name: String): Unit = + _instruments.remove(MinMaxCounterKey(name)) + + protected def removeMinMaxCounter(key: MinMaxCounterKey): Unit = + _instruments.remove(key) + + protected def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name), instrumentFactory.createGauge(name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + register(GaugeKey(name, unitOfMeasurement), instrumentFactory.createGauge(name, Some(dynamicRange), Some(refreshInterval), valueCollector)) + + protected def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, refreshInterval = Some(refreshInterval), valueCollector = valueCollector)) + + protected def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + register(key, instrumentFactory.createGauge(key.name, Some(dynamicRange), Some(refreshInterval), valueCollector = valueCollector)) + + protected def removeGauge(name: String): Unit = + _instruments.remove(GaugeKey(name)) + + protected def removeGauge(key: GaugeKey): Unit = + _instruments.remove(key) + + protected def counter(name: String): Counter = + register(CounterKey(name), instrumentFactory.createCounter()) + + protected def counter(key: CounterKey): Counter = + register(key, instrumentFactory.createCounter()) + + protected def removeCounter(name: String): Unit = + _instruments.remove(CounterKey(name)) + + protected def removeCounter(key: CounterKey): Unit = + _instruments.remove(key) + + def collect(collectionContext: CollectionContext): EntitySnapshot = { + val snapshots = Map.newBuilder[MetricKey, InstrumentSnapshot] + _instruments.foreach { + case (key, instrument) ⇒ snapshots += key -> instrument.collect(collectionContext) + } + + new DefaultEntitySnapshot(snapshots.result()) + } + + def cleanup: Unit = _instruments.values.foreach(_.cleanup) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala new file mode 100644 index 00000000..17c8f4c5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala @@ -0,0 +1,47 @@ +package kamon.metric + +import kamon.metric.instrument.{ Counter, Histogram, CollectionContext, InstrumentSnapshot } +import kamon.util.MapMerge +import scala.reflect.ClassTag + +trait EntitySnapshot { + def metrics: Map[MetricKey, InstrumentSnapshot] + def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot + + def histogram(name: String): Option[Histogram.Snapshot] = + find[HistogramKey, Histogram.Snapshot](name) + + def minMaxCounter(name: String): Option[Histogram.Snapshot] = + find[MinMaxCounterKey, Histogram.Snapshot](name) + + def gauge(name: String): Option[Histogram.Snapshot] = + find[GaugeKey, Histogram.Snapshot](name) + + def counter(name: String): Option[Counter.Snapshot] = + find[CounterKey, Counter.Snapshot](name) + + def histograms: Map[HistogramKey, Histogram.Snapshot] = + filterByType[HistogramKey, Histogram.Snapshot] + + def minMaxCounters: Map[MinMaxCounterKey, Histogram.Snapshot] = + filterByType[MinMaxCounterKey, Histogram.Snapshot] + + def gauges: Map[GaugeKey, Histogram.Snapshot] = + filterByType[GaugeKey, Histogram.Snapshot] + + def counters: Map[CounterKey, Counter.Snapshot] = + filterByType[CounterKey, Counter.Snapshot] + + private def filterByType[K <: MetricKey, V <: InstrumentSnapshot](implicit keyCT: ClassTag[K]): Map[K, V] = + metrics.collect { case (k, v) if keyCT.runtimeClass.isInstance(k) ⇒ (k.asInstanceOf[K], v.asInstanceOf[V]) } + + private def find[K <: MetricKey, V <: InstrumentSnapshot](name: String)(implicit keyCT: ClassTag[K]) = + metrics.find { case (k, v) ⇒ keyCT.runtimeClass.isInstance(k) && k.name == name } map (_._2.asInstanceOf[V]) +} + +class DefaultEntitySnapshot(val metrics: Map[MetricKey, InstrumentSnapshot]) extends EntitySnapshot { + import MapMerge.Syntax + + override def merge(that: EntitySnapshot, collectionContext: CollectionContext): EntitySnapshot = + new DefaultEntitySnapshot(metrics.merge(that.metrics, (l, r) ⇒ l.merge(r, collectionContext))) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricKey.scala b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala new file mode 100644 index 00000000..a17972df --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricKey.scala @@ -0,0 +1,153 @@ +package kamon.metric + +import kamon.metric.instrument.{ InstrumentTypes, InstrumentType, UnitOfMeasurement } + +/** + * MetricKeys are used to identify a given metric in entity recorders and snapshots. MetricKeys can be used to encode + * additional metadata for a metric being recorded, as well as the unit of measurement of the data being recorder. + */ +sealed trait MetricKey { + def name: String + def unitOfMeasurement: UnitOfMeasurement + def instrumentType: InstrumentType + def metadata: Map[String, String] +} + +// Wish that there was a shorter way to describe the operations bellow, but apparently there is no way to generalize all +// the apply/create versions that would produce the desired return types when used from Java. + +/** + * MetricKey for all Histogram-based metrics. + */ +case class HistogramKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Histogram +} + +object HistogramKey { + def apply(name: String): HistogramKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): HistogramKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): HistogramKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): HistogramKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): HistogramKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): HistogramKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all MinMaxCounter-based metrics. + */ +case class MinMaxCounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.MinMaxCounter +} + +object MinMaxCounterKey { + def apply(name: String): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): MinMaxCounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounterKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): MinMaxCounterKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all Gauge-based metrics. + */ +case class GaugeKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Gauge +} + +object GaugeKey { + def apply(name: String): GaugeKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): GaugeKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): GaugeKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): GaugeKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): GaugeKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): GaugeKey = + apply(name, unitOfMeasurement, metadata) +} + +/** + * MetricKey for all Counter-based metrics. + */ +case class CounterKey(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]) extends MetricKey { + val instrumentType = InstrumentTypes.Counter +} + +object CounterKey { + def apply(name: String): CounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def apply(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = + apply(name, unitOfMeasurement, Map.empty) + + def apply(name: String, metadata: Map[String, String]): CounterKey = + apply(name, UnitOfMeasurement.Unknown, Map.empty) + + /** + * Java friendly versions: + */ + + def create(name: String): CounterKey = + apply(name, UnitOfMeasurement.Unknown) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement): CounterKey = + apply(name, unitOfMeasurement) + + def create(name: String, metadata: Map[String, String]): CounterKey = + apply(name, metadata) + + def create(name: String, unitOfMeasurement: UnitOfMeasurement, metadata: Map[String, String]): CounterKey = + apply(name, unitOfMeasurement, metadata) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala index ed55ab06..b738eeb9 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtension.scala @@ -16,91 +16,119 @@ package kamon.metric -import akka.event.Logging.Error -import akka.event.EventStream +import akka.actor +import kamon.metric.SubscriptionsDispatcher.{ Unsubscribe, Subscribe } +import kamon.{ ModuleSupervisor, Kamon } +import kamon.metric.instrument.{ InstrumentFactory, CollectionContext } import scala.collection.concurrent.TrieMap import akka.actor._ -import com.typesafe.config.Config -import kamon.util.GlobPathFilter -import kamon.Kamon -import akka.actor -import kamon.metric.Metrics.MetricGroupFilter -import kamon.metric.Subscriptions.{ Unsubscribe, Subscribe } -import java.util.concurrent.TimeUnit +import kamon.util.{ FastDispatch, TriemapAtomicGetOrElseUpdate } -class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap +object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): MetricsExtension = super.get(system) + def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtensionImpl(system) +} - val metricsExtConfig = system.settings.config.getConfig("kamon.metrics") - printInitializationMessage(system.eventStream, metricsExtConfig.getBoolean("disable-aspectj-weaver-missing-error")) +case class EntityRegistration[T <: EntityRecorder](entity: Entity, recorder: T) - /** Configured Dispatchers */ - val metricSubscriptionsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.metric-subscriptions")) - val gaugeRecordingsDispatcher = system.dispatchers.lookup(metricsExtConfig.getString("dispatchers.gauge-recordings")) +trait MetricsExtension extends Kamon.Extension { + def settings: MetricsExtensionSettings + def shouldTrack(entity: Entity): Boolean + def shouldTrack(entityName: String, category: String): Boolean = + shouldTrack(Entity(entityName, category)) - /** Configuration Settings */ - val gaugeRecordingInterval = metricsExtConfig.getDuration("gauge-recording-interval", TimeUnit.MILLISECONDS) + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] + def unregister(entity: Entity): Unit - val storage = TrieMap[MetricGroupIdentity, MetricGroupRecorder]() - val filters = loadFilters(metricsExtConfig) - lazy val subscriptions = system.actorOf(Props[Subscriptions], "kamon-metrics-subscriptions") + def find(entity: Entity): Option[EntityRecorder] + def find(name: String, category: String): Option[EntityRecorder] - def register(identity: MetricGroupIdentity, factory: MetricGroupFactory): Option[factory.GroupRecorder] = { - if (shouldTrack(identity)) - Some(storage.atomicGetOrElseUpdate(identity, factory.create(metricsExtConfig, system)).asInstanceOf[factory.GroupRecorder]) - else - None - } + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef): Unit = + subscribe(filter, subscriber, permanently = false) - def unregister(identity: MetricGroupIdentity): Unit = { - storage.remove(identity).map(_.cleanup) - } + def subscribe(category: String, selection: String, subscriber: ActorRef, permanently: Boolean): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently) - def subscribe[C <: MetricGroupCategory](category: C, selection: String, subscriber: ActorRef, permanently: Boolean = false): Unit = - subscriptions.tell(Subscribe(category, selection, subscriber, permanently), subscriber) + def subscribe(category: String, selection: String, subscriber: ActorRef): Unit = + subscribe(SubscriptionFilter(category, selection), subscriber, permanently = false) - def unsubscribe(subscriber: ActorRef): Unit = - subscriptions.tell(Unsubscribe(subscriber), subscriber) + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean): Unit - def scheduleGaugeRecorder(body: ⇒ Unit): Cancellable = { - import scala.concurrent.duration._ + def unsubscribe(subscriber: ActorRef): Unit + def buildDefaultCollectionContext: CollectionContext + def instrumentFactory(category: String): InstrumentFactory +} - system.scheduler.schedule(gaugeRecordingInterval milliseconds, gaugeRecordingInterval milliseconds) { - body - }(gaugeRecordingsDispatcher) - } +class MetricsExtensionImpl(system: ExtendedActorSystem) extends MetricsExtension { + import FastDispatch.Syntax - private def shouldTrack(identity: MetricGroupIdentity): Boolean = { - filters.get(identity.category.name).map(filter ⇒ filter.accept(identity.name)).getOrElse(true) - } + val settings = MetricsExtensionSettings(system) - def loadFilters(config: Config): Map[String, MetricGroupFilter] = { - import scala.collection.JavaConverters._ + private val _trackedEntities = TrieMap.empty[Entity, EntityRecorder] + private val _collectionContext = buildDefaultCollectionContext + private val _metricsCollectionDispatcher = system.dispatchers.lookup(settings.metricCollectionDispatcher) + private val _subscriptions = ModuleSupervisor.get(system).createModule("subscriptions-dispatcher", + SubscriptionsDispatcher.props(settings.tickInterval, collectSnapshots).withDispatcher(settings.metricCollectionDispatcher)) - val filters = config.getObjectList("filters").asScala + def shouldTrack(entity: Entity): Boolean = + settings.entityFilters.get(entity.category).map { + filter ⇒ filter.accept(entity.name) - val allFilters = - for ( - filter ← filters; - entry ← filter.entrySet().asScala - ) yield { - val key = entry.getKey - val keyBasedConfig = entry.getValue.atKey(key) + } getOrElse (settings.trackUnmatchedEntities) - val includes = keyBasedConfig.getStringList(s"$key.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList - val excludes = keyBasedConfig.getStringList(s"$key.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + def register[T <: EntityRecorder](recorderFactory: EntityRecorderFactory[T], entityName: String): Option[EntityRegistration[T]] = { + import TriemapAtomicGetOrElseUpdate.Syntax + val entity = Entity(entityName, recorderFactory.category) - (key, MetricGroupFilter(includes, excludes)) - } + if (shouldTrack(entity)) { + val instrumentFactory = settings.instrumentFactories.get(recorderFactory.category).getOrElse(settings.defaultInstrumentFactory) + val recorder = _trackedEntities.atomicGetOrElseUpdate(entity, recorderFactory.createRecorder(instrumentFactory)).asInstanceOf[T] + Some(EntityRegistration(entity, recorder)) + } else None + } - allFilters.toMap + def register[T <: EntityRecorder](entity: Entity, recorder: T): EntityRegistration[T] = { + import TriemapAtomicGetOrElseUpdate.Syntax + EntityRegistration(entity, _trackedEntities.atomicGetOrElseUpdate(entity, recorder).asInstanceOf[T]) } + def unregister(entity: Entity): Unit = + _trackedEntities.remove(entity).map(_.cleanup) + + def find(entity: Entity): Option[EntityRecorder] = + _trackedEntities.get(entity) + + def find(name: String, category: String): Option[EntityRecorder] = + find(Entity(name, category)) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = + _subscriptions.fastDispatch(Subscribe(filter, subscriber, permanent))(_metricsCollectionDispatcher) + + def unsubscribe(subscriber: ActorRef): Unit = + _subscriptions.fastDispatch(Unsubscribe(subscriber))(_metricsCollectionDispatcher) + def buildDefaultCollectionContext: CollectionContext = - CollectionContext(metricsExtConfig.getInt("default-collection-context-buffer-size")) + CollectionContext(settings.defaultCollectionContextBufferSize) + + def instrumentFactory(category: String): InstrumentFactory = + settings.instrumentFactories.getOrElse(category, settings.defaultInstrumentFactory) + + /** + * Collect and dispatch. + */ + private def collectSnapshots(): Map[Entity, EntitySnapshot] = { + val builder = Map.newBuilder[Entity, EntitySnapshot] + _trackedEntities.foreach { + case (identity, recorder) ⇒ builder += ((identity, recorder.collect(_collectionContext))) + } - def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { + builder.result() + } + + /* def printInitializationMessage(eventStream: EventStream, disableWeaverMissingError: Boolean): Unit = { if (!disableWeaverMissingError) { val weaverMissingMessage = """ @@ -123,22 +151,6 @@ class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { eventStream.publish(Error("MetricsExtension", classOf[MetricsExtension], weaverMissingMessage)) } - } + }*/ } -object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics - def createExtension(system: ExtendedActorSystem): MetricsExtension = new MetricsExtension(system) - - case class MetricGroupFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { - def accept(name: String): Boolean = includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) - } - - implicit class AtomicGetOrElseUpdateForTriemap[K, V](trieMap: TrieMap[K, V]) { - def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = - trieMap.get(key) match { - case Some(v) ⇒ v - case None ⇒ val d = op; trieMap.putIfAbsent(key, d).getOrElse(d) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala new file mode 100644 index 00000000..ca1db850 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/MetricsExtensionSettings.scala @@ -0,0 +1,100 @@ +package kamon.metric + +import akka.actor.ExtendedActorSystem +import com.typesafe.config.Config +import kamon.metric.instrument.{ RefreshScheduler, InstrumentFactory, DefaultInstrumentSettings, InstrumentCustomSettings } +import kamon.util.GlobPathFilter + +import scala.concurrent.duration.FiniteDuration + +/** + * Configuration settings for the Metrics extension, as read from the `kamon.metric` configuration key. + */ +case class MetricsExtensionSettings( + tickInterval: FiniteDuration, + defaultCollectionContextBufferSize: Int, + trackUnmatchedEntities: Boolean, + entityFilters: Map[String, EntityFilter], + instrumentFactories: Map[String, InstrumentFactory], + defaultInstrumentFactory: InstrumentFactory, + metricCollectionDispatcher: String, + refreshSchedulerDispatcher: String, + refreshScheduler: RefreshScheduler) + +/** + * + */ +case class EntityFilter(includes: List[GlobPathFilter], excludes: List[GlobPathFilter]) { + def accept(name: String): Boolean = + includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) +} + +object MetricsExtensionSettings { + import kamon.util.ConfigTools.Syntax + import scala.concurrent.duration._ + + def apply(system: ExtendedActorSystem): MetricsExtensionSettings = { + val metricConfig = system.settings.config.getConfig("kamon.metric") + + val tickInterval = metricConfig.getFiniteDuration("tick-interval") + val collectBufferSize = metricConfig.getInt("default-collection-context-buffer-size") + val trackUnmatchedEntities = metricConfig.getBoolean("track-unmatched-entities") + val entityFilters = loadFilters(metricConfig.getConfig("filters")) + val defaultInstrumentSettings = DefaultInstrumentSettings.fromConfig(metricConfig.getConfig("default-instrument-settings")) + val metricCollectionDispatcher = metricConfig.getString("dispatchers.metric-collection") + val refreshSchedulerDispatcher = metricConfig.getString("dispatchers.refresh-scheduler") + + val refreshScheduler = RefreshScheduler(system.scheduler, system.dispatchers.lookup(refreshSchedulerDispatcher)) + val instrumentFactories = loadInstrumentFactories(metricConfig.getConfig("instrument-settings"), defaultInstrumentSettings, refreshScheduler) + val defaultInstrumentFactory = new InstrumentFactory(Map.empty, defaultInstrumentSettings, refreshScheduler) + + MetricsExtensionSettings(tickInterval, collectBufferSize, trackUnmatchedEntities, entityFilters, instrumentFactories, + defaultInstrumentFactory, metricCollectionDispatcher, refreshSchedulerDispatcher, refreshScheduler) + } + + /** + * Load all the default filters configured under the `kamon.metric.filters` configuration key. All filters are + * defined with the entity category as a sub-key of the `kamon.metric.filters` key and two sub-keys to it: includes + * and excludes with lists of string glob patterns as values. Example: + * + * {{{ + * + * kamon.metrics.filters { + * actor { + * includes = ["user/test-actor", "user/service/worker-*"] + * excludes = ["user/IO-*"] + * } + * } + * + * }}} + * + * @return a Map from category name to corresponding entity filter. + */ + def loadFilters(filtersConfig: Config): Map[String, EntityFilter] = { + import scala.collection.JavaConverters._ + + filtersConfig.firstLevelKeys map { category: String ⇒ + val includes = filtersConfig.getStringList(s"$category.includes").asScala.map(inc ⇒ new GlobPathFilter(inc)).toList + val excludes = filtersConfig.getStringList(s"$category.excludes").asScala.map(exc ⇒ new GlobPathFilter(exc)).toList + + (category, EntityFilter(includes, excludes)) + } toMap + } + + /** + * Load any custom configuration settings defined under the `kamon.metric.instrument-settings` configuration key and + * create InstrumentFactories for them. + * + * @return a Map from category name to InstrumentFactory. + */ + def loadInstrumentFactories(instrumentSettings: Config, defaults: DefaultInstrumentSettings, refreshScheduler: RefreshScheduler): Map[String, InstrumentFactory] = { + instrumentSettings.firstLevelKeys.map { category ⇒ + val categoryConfig = instrumentSettings.getConfig(category) + val customSettings = categoryConfig.firstLevelKeys.map { instrumentName ⇒ + (instrumentName, InstrumentCustomSettings.fromConfig(categoryConfig.getConfig(instrumentName))) + } toMap + + (category, new InstrumentFactory(customSettings, defaults, refreshScheduler)) + } toMap + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/Scale.scala b/kamon-core/src/main/scala/kamon/metric/Scale.scala deleted file mode 100644 index 2f27c1a3..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Scale.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -class Scale(val numericValue: Double) extends AnyVal - -object Scale { - val Nano = new Scale(1E-9) - val Micro = new Scale(1E-6) - val Milli = new Scale(1E-3) - val Unit = new Scale(1) - val Kilo = new Scale(1E3) - val Mega = new Scale(1E6) - val Giga = new Scale(1E9) - - def convert(fromUnit: Scale, toUnit: Scale, value: Long): Double = (value * fromUnit.numericValue) / toUnit.numericValue -} diff --git a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala b/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala deleted file mode 100644 index a22e1c21..00000000 --- a/kamon-core/src/main/scala/kamon/metric/Subscriptions.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.metric - -import akka.actor._ -import kamon.metric.Subscriptions._ -import kamon.util.GlobPathFilter -import scala.concurrent.duration.{ FiniteDuration, Duration } -import java.util.concurrent.TimeUnit -import kamon.{ MilliTimestamp, Kamon } -import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer - -class Subscriptions extends Actor { - import context.system - - val flushMetricsSchedule = scheduleFlushMessage() - val collectionContext = Kamon(Metrics).buildDefaultCollectionContext - - var lastTick: MilliTimestamp = MilliTimestamp.now - var oneShotSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty - var permanentSubscriptions: Map[ActorRef, MetricSelectionFilter] = Map.empty - - def receive = { - case Subscribe(category, selection, subscriber, permanent) ⇒ subscribe(category, selection, subscriber, permanent) - case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) - case Terminated(subscriber) ⇒ unsubscribe(subscriber) - case FlushMetrics ⇒ flush() - } - - def subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanent: Boolean): Unit = { - context.watch(subscriber) - val newFilter: MetricSelectionFilter = GroupAndPatternFilter(category, new GlobPathFilter(selection)) - - if (permanent) { - permanentSubscriptions = permanentSubscriptions.updated(subscriber, newFilter combine { - permanentSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) - }) - } else { - oneShotSubscriptions = oneShotSubscriptions.updated(subscriber, newFilter combine { - oneShotSubscriptions.getOrElse(subscriber, MetricSelectionFilter.empty) - }) - } - } - - def unsubscribe(subscriber: ActorRef): Unit = { - if (permanentSubscriptions.contains(subscriber)) - permanentSubscriptions = permanentSubscriptions - subscriber - - if (oneShotSubscriptions.contains(subscriber)) - oneShotSubscriptions = oneShotSubscriptions - subscriber - } - - def flush(): Unit = { - val currentTick = MilliTimestamp.now - val snapshots = collectAll() - - dispatchSelectedMetrics(lastTick, currentTick, permanentSubscriptions, snapshots) - dispatchSelectedMetrics(lastTick, currentTick, oneShotSubscriptions, snapshots) - - lastTick = currentTick - oneShotSubscriptions = Map.empty - } - - def collectAll(): Map[MetricGroupIdentity, MetricGroupSnapshot] = { - val allMetrics = Kamon(Metrics).storage - val builder = Map.newBuilder[MetricGroupIdentity, MetricGroupSnapshot] - - allMetrics.foreach { - case (identity, recorder) ⇒ builder += ((identity, recorder.collect(collectionContext))) - } - - builder.result() - } - - def dispatchSelectedMetrics(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, MetricSelectionFilter], - snapshots: Map[MetricGroupIdentity, MetricGroupSnapshot]): Unit = { - - for ((subscriber, filter) ← subscriptions) { - val selection = snapshots.filter(group ⇒ filter.accept(group._1)) - val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) - - subscriber ! tickMetrics - } - } - - def scheduleFlushMessage(): Cancellable = { - val config = context.system.settings.config - val tickInterval = Duration(config.getDuration("kamon.metrics.tick-interval", TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) - context.system.scheduler.schedule(tickInterval, tickInterval, self, FlushMetrics)(context.dispatcher) - } -} - -object Subscriptions { - case object FlushMetrics - case class Unsubscribe(subscriber: ActorRef) - case class Subscribe(category: MetricGroupCategory, selection: String, subscriber: ActorRef, permanently: Boolean = false) - case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[MetricGroupIdentity, MetricGroupSnapshot]) - - trait MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean - } - - object MetricSelectionFilter { - val empty = new MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = false - } - - implicit class CombinableMetricSelectionFilter(msf: MetricSelectionFilter) { - def combine(that: MetricSelectionFilter): MetricSelectionFilter = new MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = msf.accept(identity) || that.accept(identity) - } - } - } - - case class GroupAndPatternFilter(category: MetricGroupCategory, globFilter: GlobPathFilter) extends MetricSelectionFilter { - def accept(identity: MetricGroupIdentity): Boolean = { - category.equals(identity.category) && globFilter.accept(identity.name) - } - } -} - -class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { - val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) - val collectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext - - def receive = empty - - def empty: Actor.Receive = { - case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) - case FlushBuffer ⇒ // Nothing to flush. - } - - def buffering(buffered: TickMetricSnapshot): Actor.Receive = { - case TickMetricSnapshot(_, to, tickMetrics) ⇒ - val combinedMetrics = combineMaps(buffered.metrics, tickMetrics)(mergeMetricGroup) - val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) - - context become (buffering(combinedSnapshot)) - - case FlushBuffer ⇒ - receiver ! buffered - context become (empty) - - } - - override def postStop(): Unit = { - flushSchedule.cancel() - super.postStop() - } - - def mergeMetricGroup(left: MetricGroupSnapshot, right: MetricGroupSnapshot) = left.merge(right.asInstanceOf[left.GroupSnapshotType], collectionContext).asInstanceOf[MetricGroupSnapshot] // ??? //Combined(combineMaps(left.metrics, right.metrics)((l, r) ⇒ l.merge(r, collectionContext))) -} - -object TickMetricSnapshotBuffer { - case object FlushBuffer - - def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = - Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) -} diff --git a/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala new file mode 100644 index 00000000..f616be35 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/SubscriptionsDispatcher.scala @@ -0,0 +1,115 @@ +/* + * ========================================================================================= + * Copyright © 2013 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.metric + +import akka.actor._ +import kamon.metric.SubscriptionsDispatcher._ +import kamon.util.{ MilliTimestamp, GlobPathFilter } +import scala.concurrent.duration.FiniteDuration + +/** + * Manages subscriptions to metrics and dispatch snapshots on every tick to all subscribers. + */ +private[kamon] class SubscriptionsDispatcher(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]) extends Actor { + var lastTick = MilliTimestamp.now + var oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + var permanentSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + val tickSchedule = context.system.scheduler.schedule(interval, interval, self, Tick)(context.dispatcher) + + def receive = { + case Tick ⇒ processTick() + case Subscribe(filter, subscriber, permanently) ⇒ subscribe(filter, subscriber, permanently) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case Terminated(subscriber) ⇒ unsubscribe(subscriber) + } + + def processTick(): Unit = + dispatch(collector()) + + def subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanent: Boolean): Unit = { + def addSubscription(storage: Map[ActorRef, SubscriptionFilter]): Map[ActorRef, SubscriptionFilter] = + storage.updated(subscriber, storage.getOrElse(subscriber, SubscriptionFilter.Empty).combine(filter)) + + context.watch(subscriber) + + if (permanent) + permanentSubscriptions = addSubscription(permanentSubscriptions) + else + oneShotSubscriptions = addSubscription(oneShotSubscriptions) + } + + def unsubscribe(subscriber: ActorRef): Unit = { + permanentSubscriptions = permanentSubscriptions - subscriber + oneShotSubscriptions = oneShotSubscriptions - subscriber + } + + def dispatch(snapshots: Map[Entity, EntitySnapshot]): Unit = { + val currentTick = MilliTimestamp.now + + dispatchSelections(lastTick, currentTick, permanentSubscriptions, snapshots) + dispatchSelections(lastTick, currentTick, oneShotSubscriptions, snapshots) + + lastTick = currentTick + oneShotSubscriptions = Map.empty[ActorRef, SubscriptionFilter] + } + + def dispatchSelections(lastTick: MilliTimestamp, currentTick: MilliTimestamp, subscriptions: Map[ActorRef, SubscriptionFilter], + snapshots: Map[Entity, EntitySnapshot]): Unit = { + + for ((subscriber, filter) ← subscriptions) { + val selection = snapshots.filter(group ⇒ filter.accept(group._1)) + val tickMetrics = TickMetricSnapshot(lastTick, currentTick, selection) + + subscriber ! tickMetrics + } + } +} + +object SubscriptionsDispatcher { + def props(interval: FiniteDuration, collector: () ⇒ Map[Entity, EntitySnapshot]): Props = + Props(new SubscriptionsDispatcher(interval, collector)) + + case object Tick + case class Unsubscribe(subscriber: ActorRef) + case class Subscribe(filter: SubscriptionFilter, subscriber: ActorRef, permanently: Boolean = false) + case class TickMetricSnapshot(from: MilliTimestamp, to: MilliTimestamp, metrics: Map[Entity, EntitySnapshot]) + +} + +trait SubscriptionFilter { self ⇒ + + def accept(entity: Entity): Boolean + + final def combine(that: SubscriptionFilter): SubscriptionFilter = new SubscriptionFilter { + override def accept(entity: Entity): Boolean = self.accept(entity) || that.accept(entity) + } +} + +object SubscriptionFilter { + val Empty = new SubscriptionFilter { + def accept(entity: Entity): Boolean = false + } + + def apply(category: String, name: String): SubscriptionFilter = new SubscriptionFilter { + val categoryPattern = new GlobPathFilter(category) + val namePattern = new GlobPathFilter(name) + + def accept(entity: Entity): Boolean = { + categoryPattern.accept(entity.category) && namePattern.accept(entity.name) + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala new file mode 100644 index 00000000..b9127118 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TickMetricSnapshotBuffer.scala @@ -0,0 +1,49 @@ +package kamon.metric + +import akka.actor.{ Props, Actor, ActorRef } +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.TickMetricSnapshotBuffer.FlushBuffer +import kamon.metric.instrument.CollectionContext +import kamon.util.MapMerge + +import scala.concurrent.duration.FiniteDuration + +class TickMetricSnapshotBuffer(flushInterval: FiniteDuration, receiver: ActorRef) extends Actor { + import MapMerge.Syntax + + val flushSchedule = context.system.scheduler.schedule(flushInterval, flushInterval, self, FlushBuffer)(context.dispatcher) + val collectionContext: CollectionContext = Kamon(Metrics)(context.system).buildDefaultCollectionContext + + def receive = empty + + def empty: Actor.Receive = { + case tick: TickMetricSnapshot ⇒ context become (buffering(tick)) + case FlushBuffer ⇒ // Nothing to flush. + } + + def buffering(buffered: TickMetricSnapshot): Actor.Receive = { + case TickMetricSnapshot(_, to, tickMetrics) ⇒ + val combinedMetrics = buffered.metrics.merge(tickMetrics, (l, r) ⇒ l.merge(r, collectionContext)) + val combinedSnapshot = TickMetricSnapshot(buffered.from, to, combinedMetrics) + + context become (buffering(combinedSnapshot)) + + case FlushBuffer ⇒ + receiver ! buffered + context become (empty) + + } + + override def postStop(): Unit = { + flushSchedule.cancel() + super.postStop() + } +} + +object TickMetricSnapshotBuffer { + case object FlushBuffer + + def props(flushInterval: FiniteDuration, receiver: ActorRef): Props = + Props[TickMetricSnapshotBuffer](new TickMetricSnapshotBuffer(flushInterval, receiver)) +} diff --git a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala index eaad6e0d..3da9c1d4 100644 --- a/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/TraceMetrics.scala @@ -16,67 +16,29 @@ package kamon.metric -import akka.actor.ActorSystem -import kamon.metric.instrument.{ Histogram } +import kamon.metric.instrument.{ Time, InstrumentFactory, Histogram } -import scala.collection.concurrent.TrieMap -import com.typesafe.config.Config +class TraceMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + import TraceMetrics.segmentKey -case class TraceMetrics(name: String) extends MetricGroupIdentity { - val category = TraceMetrics -} - -object TraceMetrics extends MetricGroupCategory { - import Metrics.AtomicGetOrElseUpdateForTriemap - - val name = "trace" - - case object ElapsedTime extends MetricIdentity { val name = "elapsed-time" } - - case class TraceMetricRecorder(elapsedTime: Histogram, private val segmentRecorderFactory: () ⇒ Histogram) - extends MetricGroupRecorder { - - val segments = TrieMap[MetricIdentity, Histogram]() - - def segmentRecorder(segmentIdentity: MetricIdentity): Histogram = - segments.atomicGetOrElseUpdate(segmentIdentity, segmentRecorderFactory.apply()) - - def collect(context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot( - elapsedTime.collect(context), - segments.map { case (identity, recorder) ⇒ (identity, recorder.collect(context)) }.toMap) + /** + * Records blah blah + */ + val ElapsedTime = histogram("elapsed-time", unitOfMeasurement = Time.Nanoseconds) - def cleanup: Unit = {} - } - - case class TraceMetricsSnapshot(elapsedTime: Histogram.Snapshot, segments: Map[MetricIdentity, Histogram.Snapshot]) - extends MetricGroupSnapshot { - - type GroupSnapshotType = TraceMetricsSnapshot - - def merge(that: TraceMetricsSnapshot, context: CollectionContext): TraceMetricsSnapshot = - TraceMetricsSnapshot(elapsedTime.merge(that.elapsedTime, context), combineMaps(segments, that.segments)((l, r) ⇒ l.merge(r, context))) - - def metrics: Map[MetricIdentity, MetricSnapshot] = segments + (ElapsedTime -> elapsedTime) - } - - val Factory = TraceMetricGroupFactory + /** + * Records Blah Blah. + * + */ + def segment(name: String, category: String, library: String): Histogram = + histogram(segmentKey(name, category, library)) } -case object TraceMetricGroupFactory extends MetricGroupFactory { - - import TraceMetrics._ - - type GroupRecorder = TraceMetricRecorder - - def create(config: Config, system: ActorSystem): TraceMetricRecorder = { - val settings = config.getConfig("precision.trace") - val elapsedTimeConfig = settings.getConfig("elapsed-time") - val segmentConfig = settings.getConfig("segment") +object TraceMetrics extends EntityRecorderFactory[TraceMetrics] { + def category: String = "trace" + def createRecorder(instrumentFactory: InstrumentFactory): TraceMetrics = new TraceMetrics(instrumentFactory) - new TraceMetricRecorder( - Histogram.fromConfig(elapsedTimeConfig, Scale.Nano), - () ⇒ Histogram.fromConfig(segmentConfig, Scale.Nano)) - } + def segmentKey(name: String, category: String, library: String): HistogramKey = + HistogramKey(name, Time.Nanoseconds, Map("category" -> category, "library" -> library)) }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala index b7ac1ac5..5e1a7629 100644 --- a/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/UserMetrics.scala @@ -1,189 +1,193 @@ package kamon.metric import akka.actor -import akka.actor.{ ExtendedActorSystem, ExtensionIdProvider, ExtensionId } +import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } import kamon.Kamon -import kamon.metric.instrument.{ Gauge, MinMaxCounter, Counter, Histogram } +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange +import kamon.metric.instrument._ import scala.concurrent.duration.FiniteDuration -class UserMetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { - import Metrics.AtomicGetOrElseUpdateForTriemap - import UserMetrics._ - - lazy val metricsExtension = Kamon(Metrics)(system) - val precisionConfig = system.settings.config.getConfig("kamon.metrics.precision") - - val defaultHistogramPrecisionConfig = precisionConfig.getConfig("default-histogram-precision") - val defaultMinMaxCounterPrecisionConfig = precisionConfig.getConfig("default-min-max-counter-precision") - val defaultGaugePrecisionConfig = precisionConfig.getConfig("default-gauge-precision") +object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): UserMetricsExtension = super.get(system) + def lookup(): ExtensionId[_ <: actor.Extension] = UserMetrics + def createExtension(system: ExtendedActorSystem): UserMetricsExtension = { + val metricsExtension = Metrics.get(system) + val instrumentFactory = metricsExtension.instrumentFactory(entity.category) + val userMetricsExtension = new UserMetricsExtensionImpl(instrumentFactory) - def registerHistogram(name: String, precision: Histogram.Precision, highestTrackableValue: Long): Histogram = { - metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { - UserHistogramRecorder(Histogram(highestTrackableValue, precision, Scale.Unit)) - }).asInstanceOf[UserHistogramRecorder].histogram + metricsExtension.register(entity, userMetricsExtension).recorder } - def registerHistogram(name: String): Histogram = { - metricsExtension.storage.atomicGetOrElseUpdate(UserHistogram(name), { - UserHistogramRecorder(Histogram.fromConfig(defaultHistogramPrecisionConfig)) - }).asInstanceOf[UserHistogramRecorder].histogram - } + val entity = Entity("user-metric", "user-metric") +} - def registerCounter(name: String): Counter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserCounter(name), { - UserCounterRecorder(Counter()) - }).asInstanceOf[UserCounterRecorder].counter - } +trait UserMetricsExtension extends Kamon.Extension { + def histogram(name: String): Histogram + def histogram(name: String, dynamicRange: DynamicRange): Histogram + def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram + def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram + def histogram(key: HistogramKey): Histogram + def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram + def removeHistogram(name: String): Unit + def removeHistogram(key: HistogramKey): Unit + + def minMaxCounter(name: String): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter + def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter + def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter + def removeMinMaxCounter(name: String): Unit + def removeMinMaxCounter(key: MinMaxCounterKey): Unit + + def gauge(name: String, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge + def removeGauge(name: String): Unit + def removeGauge(key: GaugeKey): Unit + + def counter(name: String): Counter + def counter(key: CounterKey): Counter + def removeCounter(name: String): Unit + def removeCounter(key: CounterKey): Unit - def registerMinMaxCounter(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration): MinMaxCounter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { - UserMinMaxCounterRecorder(MinMaxCounter(highestTrackableValue, precision, Scale.Unit, refreshInterval, system)) - }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter - } +} - def registerMinMaxCounter(name: String): MinMaxCounter = { - metricsExtension.storage.atomicGetOrElseUpdate(UserMinMaxCounter(name), { - UserMinMaxCounterRecorder(MinMaxCounter.fromConfig(defaultMinMaxCounterPrecisionConfig, system)) - }).asInstanceOf[UserMinMaxCounterRecorder].minMaxCounter - } +class UserMetricsExtensionImpl(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) with UserMetricsExtension { + override def histogram(name: String): Histogram = + super.histogram(name) - def registerGauge(name: String)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { - UserGaugeRecorder(Gauge.fromConfig(defaultGaugePrecisionConfig, system)(currentValueCollector)) - }).asInstanceOf[UserGaugeRecorder].gauge - } + override def histogram(name: String, dynamicRange: DynamicRange): Histogram = + super.histogram(name, dynamicRange) - def registerGauge(name: String, precision: Histogram.Precision, highestTrackableValue: Long, - refreshInterval: FiniteDuration)(currentValueCollector: Gauge.CurrentValueCollector): Gauge = { - metricsExtension.storage.atomicGetOrElseUpdate(UserGauge(name), { - UserGaugeRecorder(Gauge(precision, highestTrackableValue, Scale.Unit, refreshInterval, system)(currentValueCollector)) - }).asInstanceOf[UserGaugeRecorder].gauge - } + override def histogram(name: String, unitOfMeasurement: UnitOfMeasurement): Histogram = + super.histogram(name, unitOfMeasurement) - def removeHistogram(name: String): Unit = - metricsExtension.unregister(UserHistogram(name)) + override def histogram(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): Histogram = + super.histogram(name, dynamicRange, unitOfMeasurement) - def removeCounter(name: String): Unit = - metricsExtension.unregister(UserCounter(name)) + override def histogram(key: HistogramKey): Histogram = + super.histogram(key) - def removeMinMaxCounter(name: String): Unit = - metricsExtension.unregister(UserMinMaxCounter(name)) + override def histogram(key: HistogramKey, dynamicRange: DynamicRange): Histogram = + super.histogram(key, dynamicRange) - def removeGauge(name: String): Unit = - metricsExtension.unregister(UserGauge(name)) -} + override def removeHistogram(name: String): Unit = + super.removeHistogram(name) -object UserMetrics extends ExtensionId[UserMetricsExtension] with ExtensionIdProvider { - def lookup(): ExtensionId[_ <: actor.Extension] = Metrics + override def removeHistogram(key: HistogramKey): Unit = + super.removeHistogram(key) - def createExtension(system: ExtendedActorSystem): UserMetricsExtension = new UserMetricsExtension(system) + override def minMaxCounter(name: String): MinMaxCounter = + super.minMaxCounter(name) - sealed trait UserMetricGroup - // - // Histograms - // + override def minMaxCounter(name: String, dynamicRange: DynamicRange): MinMaxCounter = + super.minMaxCounter(name, dynamicRange) - case class UserHistogram(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserHistograms - } + override def minMaxCounter(name: String, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(name, refreshInterval) - case class UserHistogramRecorder(histogram: Histogram) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserHistogramSnapshot(histogram.collect(context)) + override def minMaxCounter(name: String, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, unitOfMeasurement) - def cleanup: Unit = histogram.cleanup - } + override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, refreshInterval) - case class UserHistogramSnapshot(histogramSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserHistogramSnapshot + override def minMaxCounter(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, unitOfMeasurement) - def merge(that: UserHistogramSnapshot, context: CollectionContext): UserHistogramSnapshot = - UserHistogramSnapshot(that.histogramSnapshot.merge(histogramSnapshot, context)) + override def minMaxCounter(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, refreshInterval, unitOfMeasurement) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, histogramSnapshot)) - } + override def minMaxCounter(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement): MinMaxCounter = + super.minMaxCounter(name, dynamicRange, refreshInterval, unitOfMeasurement) - // - // Counters - // + override def minMaxCounter(key: MinMaxCounterKey): MinMaxCounter = + super.minMaxCounter(key) - case class UserCounter(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserCounters - } + override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange): MinMaxCounter = + super.minMaxCounter(key, dynamicRange) - case class UserCounterRecorder(counter: Counter) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserCounterSnapshot(counter.collect(context)) + override def minMaxCounter(key: MinMaxCounterKey, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(key, refreshInterval) - def cleanup: Unit = counter.cleanup - } + override def minMaxCounter(key: MinMaxCounterKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration): MinMaxCounter = + super.minMaxCounter(key, dynamicRange, refreshInterval) - case class UserCounterSnapshot(counterSnapshot: Counter.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserCounterSnapshot + override def removeMinMaxCounter(name: String): Unit = + super.removeMinMaxCounter(name) - def merge(that: UserCounterSnapshot, context: CollectionContext): UserCounterSnapshot = - UserCounterSnapshot(that.counterSnapshot.merge(counterSnapshot, context)) + override def removeMinMaxCounter(key: MinMaxCounterKey): Unit = + super.removeMinMaxCounter(key) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((Count, counterSnapshot)) - } + override def gauge(name: String, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, valueCollector) - // - // MinMaxCounters - // + override def gauge(name: String, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, valueCollector) - case class UserMinMaxCounter(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserMinMaxCounters - } + override def gauge(name: String, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, refreshInterval, valueCollector) - case class UserMinMaxCounterRecorder(minMaxCounter: MinMaxCounter) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserMinMaxCounterSnapshot(minMaxCounter.collect(context)) + override def gauge(name: String, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, unitOfMeasurement, valueCollector) - def cleanup: Unit = minMaxCounter.cleanup - } + override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, refreshInterval, valueCollector) - case class UserMinMaxCounterSnapshot(minMaxCounterSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserMinMaxCounterSnapshot + override def gauge(name: String, dynamicRange: DynamicRange, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, unitOfMeasurement, valueCollector) - def merge(that: UserMinMaxCounterSnapshot, context: CollectionContext): UserMinMaxCounterSnapshot = - UserMinMaxCounterSnapshot(that.minMaxCounterSnapshot.merge(minMaxCounterSnapshot, context)) + override def gauge(name: String, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, refreshInterval, unitOfMeasurement, valueCollector) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, minMaxCounterSnapshot)) - } - - // - // Gauges - // + override def gauge(name: String, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, unitOfMeasurement: UnitOfMeasurement, valueCollector: CurrentValueCollector): Gauge = + super.gauge(name, dynamicRange, refreshInterval, unitOfMeasurement, valueCollector) - case class UserGauge(name: String) extends MetricGroupIdentity with UserMetricGroup { - val category = UserGauges - } + override def gauge(key: GaugeKey, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, valueCollector) - case class UserGaugeRecorder(gauge: Gauge) extends MetricGroupRecorder { - def collect(context: CollectionContext): MetricGroupSnapshot = - UserGaugeSnapshot(gauge.collect(context)) + override def gauge(key: GaugeKey, dynamicRange: DynamicRange, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, dynamicRange, valueCollector) - def cleanup: Unit = gauge.cleanup - } + override def gauge(key: GaugeKey, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, refreshInterval, valueCollector) - case class UserGaugeSnapshot(gaugeSnapshot: Histogram.Snapshot) extends MetricGroupSnapshot { - type GroupSnapshotType = UserGaugeSnapshot + override def gauge(key: GaugeKey, dynamicRange: DynamicRange, refreshInterval: FiniteDuration, valueCollector: CurrentValueCollector): Gauge = + super.gauge(key, dynamicRange, refreshInterval, valueCollector) - def merge(that: UserGaugeSnapshot, context: CollectionContext): UserGaugeSnapshot = - UserGaugeSnapshot(that.gaugeSnapshot.merge(gaugeSnapshot, context)) + override def removeGauge(name: String): Unit = + super.removeGauge(name) - def metrics: Map[MetricIdentity, MetricSnapshot] = Map((RecordedValues, gaugeSnapshot)) - } + override def removeGauge(key: GaugeKey): Unit = + super.removeGauge(key) - case object UserHistograms extends MetricGroupCategory { val name: String = "histogram" } - case object UserCounters extends MetricGroupCategory { val name: String = "counter" } - case object UserMinMaxCounters extends MetricGroupCategory { val name: String = "min-max-counter" } - case object UserGauges extends MetricGroupCategory { val name: String = "gauge" } + override def counter(name: String): Counter = + super.counter(name) - case object RecordedValues extends MetricIdentity { val name: String = "values" } - case object Count extends MetricIdentity { val name: String = "count" } + override def counter(key: CounterKey): Counter = + super.counter(key) -} + override def removeCounter(name: String): Unit = + super.removeCounter(name) + override def removeCounter(key: CounterKey): Unit = + super.removeCounter(key) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/package.scala b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala index 43166058..e79090a8 100644 --- a/kamon-core/src/main/scala/kamon/metric/package.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/AtomicHistogramFieldsAccessor.scala @@ -1,6 +1,6 @@ /* * ========================================================================================= - * Copyright © 2013 the kamon project <http://kamon.io/> + * Copyright © 2013-2014 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -14,21 +14,22 @@ * ========================================================================================= */ -package kamon +package org.HdrHistogram -import scala.annotation.tailrec -import com.typesafe.config.Config +import java.util.concurrent.atomic.{ AtomicLongArray, AtomicLongFieldUpdater } -package object metric { +trait AtomicHistogramFieldsAccessor { + self: AtomicHistogram ⇒ - @tailrec def combineMaps[K, V](left: Map[K, V], right: Map[K, V])(valueMerger: (V, V) ⇒ V): Map[K, V] = { - if (right.isEmpty) - left - else { - val (key, rightValue) = right.head - val value = left.get(key).map(valueMerger(_, rightValue)).getOrElse(rightValue) + def countsArray(): AtomicLongArray = self.counts - combineMaps(left.updated(key, value), right.tail)(valueMerger) - } - } + def unitMagnitude(): Int = self.unitMagnitude + + def subBucketHalfCount(): Int = self.subBucketHalfCount + + def subBucketHalfCountMagnitude(): Int = self.subBucketHalfCountMagnitude +} + +object AtomicHistogramFieldsAccessor { + def totalCountUpdater(): AtomicLongFieldUpdater[AtomicHistogram] = AtomicHistogram.totalCountUpdater } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index 0f29ba6f..c1b69cbe 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -17,9 +17,8 @@ package kamon.metric.instrument import kamon.jsr166.LongAdder -import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } -trait Counter extends MetricRecorder { +trait Counter extends Instrument { type SnapshotType = Counter.Snapshot def increment(): Unit @@ -29,12 +28,11 @@ trait Counter extends MetricRecorder { object Counter { def apply(): Counter = new LongAdderCounter + def create(): Counter = apply() - trait Snapshot extends MetricSnapshot { - type SnapshotType = Counter.Snapshot - + trait Snapshot extends InstrumentSnapshot { def count: Long - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot } } @@ -55,5 +53,8 @@ class LongAdderCounter extends Counter { } case class CounterSnapshot(count: Long) extends Counter.Snapshot { - def merge(that: Counter.Snapshot, context: CollectionContext): Counter.Snapshot = CounterSnapshot(count + that.count) + def merge(that: InstrumentSnapshot, context: CollectionContext): Counter.Snapshot = that match { + case CounterSnapshot(thatCount) ⇒ CounterSnapshot(count + thatCount) + case other ⇒ sys.error(s"Cannot merge a CounterSnapshot with the incompatible [${other.getClass.getName}] type.") + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index efd7d78f..2341504c 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -1,70 +1,89 @@ package kamon.metric.instrument -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicLong, AtomicLongFieldUpdater, AtomicReference } -import akka.actor.{ Cancellable, ActorSystem } -import com.typesafe.config.Config -import kamon.metric.{ CollectionContext, Scale, MetricRecorder } +import akka.actor.Cancellable +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange import scala.concurrent.duration.FiniteDuration -trait Gauge extends MetricRecorder { +trait Gauge extends Instrument { type SnapshotType = Histogram.Snapshot - def record(value: Long) - def record(value: Long, count: Long) + def record(value: Long): Unit + def record(value: Long, count: Long): Unit + def refreshValue(): Unit } object Gauge { - trait CurrentValueCollector { - def currentValue: Long - } - - def apply(precision: Histogram.Precision, highestTrackableValue: Long, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) - val gauge = new HistogramBackedGauge(underlyingHistogram, currentValueCollector) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = { + val underlyingHistogram = Histogram(dynamicRange) + val gauge = new HistogramBackedGauge(underlyingHistogram, valueCollector) + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { gauge.refreshValue() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) - gauge.refreshValuesSchedule.set(refreshValuesSchedule) + gauge.automaticValueCollectorSchedule.set(refreshValuesSchedule) gauge } - def fromDefaultConfig(system: ActorSystem)(currentValueCollectorFunction: () ⇒ Long): Gauge = - fromDefaultConfig(system, functionZeroAsCurrentValueCollector(currentValueCollectorFunction)) + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler, valueCollector: CurrentValueCollector): Gauge = + apply(dynamicRange, refreshInterval, scheduler, valueCollector) - def fromDefaultConfig(system: ActorSystem, currentValueCollector: CurrentValueCollector): Gauge = { - val config = system.settings.config.getConfig("kamon.metrics.precision.default-gauge-precision") - fromConfig(config, system)(currentValueCollector) + trait CurrentValueCollector { + def currentValue: Long } - def fromConfig(config: Config, system: ActorSystem, scale: Scale)(currentValueCollector: CurrentValueCollector): Gauge = { - import scala.concurrent.duration._ + implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { + def currentValue: Long = f.apply() + } +} - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) +/** + * Helper for cases in which a gauge shouldn't store the current value of a observed value but the difference between + * the current observed value and the previously observed value. Should only be used if the observed value is always + * increasing or staying steady, but is never able to decrease. + * + * Note: The first time a value is collected, this wrapper will always return zero, afterwards, the difference between + * the current value and the last value will be returned. + */ +class DifferentialValueCollector(wrappedValueCollector: CurrentValueCollector) extends CurrentValueCollector { + @volatile private var _readAtLeastOnce = false + private val _lastObservedValue = new AtomicLong(0) + + def currentValue: Long = { + if (_readAtLeastOnce) { + val wrappedCurrent = wrappedValueCollector.currentValue + val d = wrappedCurrent - _lastObservedValue.getAndSet(wrappedCurrent) + + if (d < 0) + println("HUBO MENOR QUE CERO") + + d + + } else { + _lastObservedValue.set(wrappedValueCollector.currentValue) + _readAtLeastOnce = true + 0 + } - Gauge(Histogram.Precision(significantDigits), highest, scale, refreshInterval.millis, system)(currentValueCollector) } +} - def fromConfig(config: Config, system: ActorSystem)(currentValueCollector: CurrentValueCollector): Gauge = { - fromConfig(config, system, Scale.Unit)(currentValueCollector) - } +object DifferentialValueCollector { + def apply(wrappedValueCollector: CurrentValueCollector): CurrentValueCollector = + new DifferentialValueCollector(wrappedValueCollector) - implicit def functionZeroAsCurrentValueCollector(f: () ⇒ Long): CurrentValueCollector = new CurrentValueCollector { - def currentValue: Long = f.apply() - } + def apply(wrappedValueCollector: ⇒ Long): CurrentValueCollector = + new DifferentialValueCollector(new CurrentValueCollector { + def currentValue: Long = wrappedValueCollector + }) } class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector: Gauge.CurrentValueCollector) extends Gauge { - val refreshValuesSchedule = new AtomicReference[Cancellable]() + private[kamon] val automaticValueCollectorSchedule = new AtomicReference[Cancellable]() def record(value: Long): Unit = underlyingHistogram.record(value) @@ -73,10 +92,15 @@ class HistogramBackedGauge(underlyingHistogram: Histogram, currentValueCollector def collect(context: CollectionContext): Histogram.Snapshot = underlyingHistogram.collect(context) def cleanup: Unit = { - if (refreshValuesSchedule.get() != null) - refreshValuesSchedule.get().cancel() + if (automaticValueCollectorSchedule.get() != null) + automaticValueCollectorSchedule.get().cancel() } - def refreshValue(): Unit = underlyingHistogram.record(currentValueCollector.currentValue) + def refreshValue(): Unit = { + val a = currentValueCollector.currentValue + if (a < 0) + println("RECORDING FROM GAUGE => " + a + " - " + currentValueCollector.getClass) + underlyingHistogram.record(a) + } } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala index bed75fc8..5c4c7f71 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala @@ -17,12 +17,11 @@ package kamon.metric.instrument import java.nio.LongBuffer -import com.typesafe.config.Config import org.HdrHistogram.AtomicHistogramFieldsAccessor +import kamon.metric.instrument.Histogram.{ Snapshot, DynamicRange } import org.HdrHistogram.AtomicHistogram -import kamon.metric._ -trait Histogram extends MetricRecorder { +trait Histogram extends Instrument { type SnapshotType = Histogram.Snapshot def record(value: Long) @@ -31,30 +30,40 @@ trait Histogram extends MetricRecorder { object Histogram { - def apply(highestTrackableValue: Long, precision: Precision, scale: Scale): Histogram = - new HdrHistogram(1L, highestTrackableValue, precision.significantDigits, scale) - - def fromConfig(config: Config): Histogram = { - fromConfig(config, Scale.Unit) - } - - def fromConfig(config: Config, scale: Scale): Histogram = { - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - - new HdrHistogram(1L, highest, significantDigits, scale) - } - - object HighestTrackableValue { - val OneHourInNanoseconds = 3600L * 1000L * 1000L * 1000L - } - - case class Precision(significantDigits: Int) - object Precision { - val Low = Precision(1) - val Normal = Precision(2) - val Fine = Precision(3) - } + /** + * Scala API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def apply(dynamicRange: DynamicRange): Histogram = new HdrHistogram(dynamicRange) + + /** + * Java API: + * + * Create a new High Dynamic Range Histogram ([[kamon.metric.instrument.HdrHistogram]]) using the given + * [[kamon.metric.instrument.Histogram.DynamicRange]]. + */ + def create(dynamicRange: DynamicRange): Histogram = apply(dynamicRange) + + /** + * DynamicRange is a configuration object used to supply range and precision configuration to a + * [[kamon.metric.instrument.HdrHistogram]]. See the [[http://hdrhistogram.github.io/HdrHistogram/ HdrHistogram website]] + * for more details on how it works and the effects of these configuration values. + * + * @param lowestDiscernibleValue + * The lowest value that can be discerned (distinguished from 0) by the histogram.Must be a positive integer that + * is >= 1. May be internally rounded down to nearest power of 2. + * + * @param highestTrackableValue + * The highest value to be tracked by the histogram. Must be a positive integer that is >= (2 * lowestDiscernibleValue). + * Must not be larger than (Long.MAX_VALUE/2). + * + * @param precision + * The number of significant decimal digits to which the histogram will maintain value resolution and separation. + * Must be a non-negative integer between 1 and 3. + */ + case class DynamicRange(lowestDiscernibleValue: Long, highestTrackableValue: Long, precision: Int) trait Record { def level: Long @@ -67,29 +76,28 @@ object Histogram { var rawCompactRecord: Long = 0L } - trait Snapshot extends MetricSnapshot { - type SnapshotType = Histogram.Snapshot + trait Snapshot extends InstrumentSnapshot { def isEmpty: Boolean = numberOfMeasurements == 0 - def scale: Scale def numberOfMeasurements: Long def min: Long def max: Long def sum: Long def percentile(percentile: Double): Long def recordsIterator: Iterator[Record] + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot } object Snapshot { - def empty(targetScale: Scale) = new Snapshot { + val empty = new Snapshot { override def min: Long = 0L override def max: Long = 0L override def sum: Long = 0L override def percentile(percentile: Double): Long = 0L override def recordsIterator: Iterator[Record] = Iterator.empty - override def merge(that: Snapshot, context: CollectionContext): Snapshot = that - override def scale: Scale = targetScale + override def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot = that + override def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = that override def numberOfMeasurements: Long = 0L } } @@ -100,10 +108,8 @@ object Histogram { * The collect(..) operation extracts all the recorded values from the histogram and resets the counts, but still * leave it in a consistent state even in the case of concurrent modification while the snapshot is being taken. */ -class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, significantValueDigits: Int, scale: Scale = Scale.Unit) - extends AtomicHistogram(lowestTrackableValue, highestTrackableValue, significantValueDigits) - with Histogram with AtomicHistogramFieldsAccessor { - +class HdrHistogram(dynamicRange: DynamicRange) extends AtomicHistogram(dynamicRange.lowestDiscernibleValue, + dynamicRange.highestTrackableValue, dynamicRange.precision) with Histogram with AtomicHistogramFieldsAccessor { import AtomicHistogramFieldsAccessor.totalCountUpdater def record(value: Long): Unit = recordValue(value) @@ -119,7 +125,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign val measurementsArray = Array.ofDim[Long](buffer.limit()) buffer.get(measurementsArray, 0, measurementsArray.length) - new CompactHdrSnapshot(scale, nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) + new CompactHdrSnapshot(nrOfMeasurements, measurementsArray, unitMagnitude(), subBucketHalfCount(), subBucketHalfCountMagnitude()) } def getCounts = countsArray().length() @@ -160,7 +166,7 @@ class HdrHistogram(lowestTrackableValue: Long, highestTrackableValue: Long, sign } -case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, +case class CompactHdrSnapshot(val numberOfMeasurements: Long, compactRecords: Array[Long], unitMagnitude: Int, subBucketHalfCount: Int, subBucketHalfCountMagnitude: Int) extends Histogram.Snapshot { def min: Long = if (compactRecords.length == 0) 0 else levelFromCompactRecord(compactRecords(0)) @@ -182,53 +188,61 @@ case class CompactHdrSnapshot(val scale: Scale, val numberOfMeasurements: Long, percentileLevel } - def merge(that: Histogram.Snapshot, context: CollectionContext): Histogram.Snapshot = { - if (that.isEmpty) this else if (this.isEmpty) that else { - import context.buffer - buffer.clear() + def merge(that: Histogram.Snapshot, context: CollectionContext): Snapshot = + merge(that.asInstanceOf[InstrumentSnapshot], context) - val selfIterator = recordsIterator - val thatIterator = that.recordsIterator - var thatCurrentRecord: Histogram.Record = null - var mergedNumberOfMeasurements = 0L + def merge(that: InstrumentSnapshot, context: CollectionContext): Histogram.Snapshot = that match { + case thatSnapshot: CompactHdrSnapshot ⇒ + if (thatSnapshot.isEmpty) this else if (this.isEmpty) thatSnapshot else { + import context.buffer + buffer.clear() - def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null - def addToBuffer(compactRecord: Long): Unit = { - mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) - buffer.put(compactRecord) - } + val selfIterator = recordsIterator + val thatIterator = thatSnapshot.recordsIterator + var thatCurrentRecord: Histogram.Record = null + var mergedNumberOfMeasurements = 0L - while (selfIterator.hasNext) { - val selfCurrentRecord = selfIterator.next() + def nextOrNull(iterator: Iterator[Histogram.Record]): Histogram.Record = if (iterator.hasNext) iterator.next() else null + def addToBuffer(compactRecord: Long): Unit = { + mergedNumberOfMeasurements += countFromCompactRecord(compactRecord) + buffer.put(compactRecord) + } - // Advance that to no further than the level of selfCurrentRecord - thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord - while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { - addToBuffer(thatCurrentRecord.rawCompactRecord) - thatCurrentRecord = nextOrNull(thatIterator) + while (selfIterator.hasNext) { + val selfCurrentRecord = selfIterator.next() + + // Advance that to no further than the level of selfCurrentRecord + thatCurrentRecord = if (thatCurrentRecord == null) nextOrNull(thatIterator) else thatCurrentRecord + while (thatCurrentRecord != null && thatCurrentRecord.level < selfCurrentRecord.level) { + addToBuffer(thatCurrentRecord.rawCompactRecord) + thatCurrentRecord = nextOrNull(thatIterator) + } + + // Include the current record of self and optionally merge if has the same level as thatCurrentRecord + if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { + addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) + thatCurrentRecord = nextOrNull(thatIterator) + } else { + addToBuffer(selfCurrentRecord.rawCompactRecord) + } } - // Include the current record of self and optionally merge if has the same level as thatCurrentRecord - if (thatCurrentRecord != null && thatCurrentRecord.level == selfCurrentRecord.level) { - addToBuffer(mergeCompactRecords(thatCurrentRecord.rawCompactRecord, selfCurrentRecord.rawCompactRecord)) - thatCurrentRecord = nextOrNull(thatIterator) - } else { - addToBuffer(selfCurrentRecord.rawCompactRecord) + // Include everything that might have been left from that + if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) + while (thatIterator.hasNext) { + addToBuffer(thatIterator.next().rawCompactRecord) } - } - // Include everything that might have been left from that - if (thatCurrentRecord != null) addToBuffer(thatCurrentRecord.rawCompactRecord) - while (thatIterator.hasNext) { - addToBuffer(thatIterator.next().rawCompactRecord) + buffer.flip() + val compactRecords = Array.ofDim[Long](buffer.limit()) + buffer.get(compactRecords) + + new CompactHdrSnapshot(mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) } - buffer.flip() - val compactRecords = Array.ofDim[Long](buffer.limit()) - buffer.get(compactRecords) + case other ⇒ + sys.error(s"Cannot merge a CompactHdrSnapshot with the incompatible [${other.getClass.getName}] type.") - new CompactHdrSnapshot(scale, mergedNumberOfMeasurements, compactRecords, unitMagnitude, subBucketHalfCount, subBucketHalfCountMagnitude) - } } @inline private def mergeCompactRecords(left: Long, right: Long): Long = { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala new file mode 100644 index 00000000..8cacc767 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala @@ -0,0 +1,56 @@ +package kamon.metric.instrument + +import java.nio.LongBuffer + +import akka.actor.{ Scheduler, Cancellable } +import akka.dispatch.MessageDispatcher +import scala.concurrent.duration.FiniteDuration + +private[kamon] trait Instrument { + type SnapshotType <: InstrumentSnapshot + + def collect(context: CollectionContext): SnapshotType + def cleanup: Unit +} + +trait InstrumentSnapshot { + def merge(that: InstrumentSnapshot, context: CollectionContext): InstrumentSnapshot +} + +class InstrumentType private[kamon] (val id: Int) extends AnyVal +object InstrumentTypes { + val Histogram = new InstrumentType(1) + val MinMaxCounter = new InstrumentType(2) + val Gauge = new InstrumentType(3) + val Counter = new InstrumentType(4) +} + +trait CollectionContext { + def buffer: LongBuffer +} + +object CollectionContext { + def apply(longBufferSize: Int): CollectionContext = new CollectionContext { + val buffer: LongBuffer = LongBuffer.allocate(longBufferSize) + } +} + +trait RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable +} + +object RefreshScheduler { + val NoopScheduler = new RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = new Cancellable { + override def isCancelled: Boolean = true + override def cancel(): Boolean = true + } + } + + def apply(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = new RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = + scheduler.schedule(interval, interval)(refresh.apply())(dispatcher) + } + + def create(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = apply(scheduler, dispatcher) +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala new file mode 100644 index 00000000..9b0c85cb --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -0,0 +1,35 @@ +package kamon.metric.instrument + +import kamon.metric.instrument.Gauge.CurrentValueCollector +import kamon.metric.instrument.Histogram.DynamicRange + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentFactory(configurations: Map[String, InstrumentCustomSettings], defaults: DefaultInstrumentSettings, scheduler: RefreshScheduler) { + + private def resolveSettings(instrumentName: String, codeSettings: Option[InstrumentSettings], default: InstrumentSettings): InstrumentSettings = { + configurations.get(instrumentName).flatMap { customSettings ⇒ + codeSettings.map(cs ⇒ customSettings.combine(cs)) orElse (Some(customSettings.combine(default))) + + } getOrElse (codeSettings.getOrElse(default)) + } + + def createHistogram(name: String, dynamicRange: Option[DynamicRange] = None): Histogram = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, None)), defaults.histogram) + Histogram(settings.dynamicRange) + } + + def createMinMaxCounter(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None): MinMaxCounter = { + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.minMaxCounter) + MinMaxCounter(settings.dynamicRange, settings.refreshInterval.get, scheduler) + } + + def createGauge(name: String, dynamicRange: Option[DynamicRange] = None, refreshInterval: Option[FiniteDuration] = None, + valueCollector: CurrentValueCollector): Gauge = { + + val settings = resolveSettings(name, dynamicRange.map(dr ⇒ InstrumentSettings(dr, refreshInterval)), defaults.gauge) + Gauge(settings.dynamicRange, settings.refreshInterval.get, scheduler, valueCollector) + } + + def createCounter(): Counter = Counter() +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala new file mode 100644 index 00000000..1446a25d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSettings.scala @@ -0,0 +1,67 @@ +package kamon.metric.instrument + +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config +import kamon.metric.instrument.Histogram.DynamicRange + +import scala.concurrent.duration.FiniteDuration + +case class InstrumentCustomSettings(lowestDiscernibleValue: Option[Long], highestTrackableValue: Option[Long], + precision: Option[Int], refreshInterval: Option[FiniteDuration]) { + + def combine(that: InstrumentSettings): InstrumentSettings = + InstrumentSettings( + DynamicRange( + lowestDiscernibleValue.getOrElse(that.dynamicRange.lowestDiscernibleValue), + highestTrackableValue.getOrElse(that.dynamicRange.highestTrackableValue), + precision.getOrElse(that.dynamicRange.precision)), + refreshInterval.orElse(that.refreshInterval)) +} + +object InstrumentCustomSettings { + import scala.concurrent.duration._ + + def fromConfig(config: Config): InstrumentCustomSettings = + InstrumentCustomSettings( + if (config.hasPath("lowest-discernible-value")) Some(config.getLong("lowest-discernible-value")) else None, + if (config.hasPath("highest-trackable-value")) Some(config.getLong("highest-trackable-value")) else None, + if (config.hasPath("precision")) Some(InstrumentSettings.parsePrecision(config.getString("precision"))) else None, + if (config.hasPath("refresh-interval")) Some(config.getDuration("refresh-interval", TimeUnit.NANOSECONDS).nanos) else None) + +} + +case class InstrumentSettings(dynamicRange: DynamicRange, refreshInterval: Option[FiniteDuration]) + +object InstrumentSettings { + + def readDynamicRange(config: Config): DynamicRange = + DynamicRange( + config.getLong("lowest-discernible-value"), + config.getLong("highest-trackable-value"), + parsePrecision(config.getString("precision"))) + + def parsePrecision(stringValue: String): Int = stringValue match { + case "low" ⇒ 1 + case "normal" ⇒ 2 + case "fine" ⇒ 3 + case other ⇒ sys.error(s"Invalid precision configuration [$other] found, valid options are: [low|normal|fine].") + } +} + +case class DefaultInstrumentSettings(histogram: InstrumentSettings, minMaxCounter: InstrumentSettings, gauge: InstrumentSettings) + +object DefaultInstrumentSettings { + + def fromConfig(config: Config): DefaultInstrumentSettings = { + import scala.concurrent.duration._ + + val histogramSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("histogram")), None) + val minMaxCounterSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("min-max-counter")), + Some(config.getDuration("min-max-counter.refresh-interval", TimeUnit.NANOSECONDS).nanos)) + val gaugeSettings = InstrumentSettings(InstrumentSettings.readDynamicRange(config.getConfig("gauge")), + Some(config.getDuration("gauge.refresh-interval", TimeUnit.NANOSECONDS).nanos)) + + DefaultInstrumentSettings(histogramSettings, minMaxCounterSettings, gaugeSettings) + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala index 4882d2aa..0828c8a9 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -17,16 +17,14 @@ package kamon.metric.instrument */ import java.lang.Math.abs -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ ActorSystem, Cancellable } -import com.typesafe.config.Config +import akka.actor.Cancellable import kamon.jsr166.LongMaxUpdater -import kamon.metric.{ Scale, MetricRecorder, CollectionContext } +import kamon.metric.instrument.Histogram.DynamicRange import kamon.util.PaddedAtomicLong import scala.concurrent.duration.FiniteDuration -trait MinMaxCounter extends MetricRecorder { +trait MinMaxCounter extends Instrument { override type SnapshotType = Histogram.Snapshot def increment(): Unit @@ -38,29 +36,20 @@ trait MinMaxCounter extends MetricRecorder { object MinMaxCounter { - def apply(highestTrackableValue: Long, precision: Histogram.Precision, scale: Scale, refreshInterval: FiniteDuration, - system: ActorSystem): MinMaxCounter = { - - val underlyingHistogram = Histogram(highestTrackableValue, precision, scale) + def apply(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = { + val underlyingHistogram = Histogram(dynamicRange) val minMaxCounter = new PaddedMinMaxCounter(underlyingHistogram) - - val refreshValuesSchedule = system.scheduler.schedule(refreshInterval, refreshInterval) { + val refreshValuesSchedule = scheduler.schedule(refreshInterval, () ⇒ { minMaxCounter.refreshValues() - }(system.dispatcher) // TODO: Move this to Kamon dispatchers + }) minMaxCounter.refreshValuesSchedule.set(refreshValuesSchedule) minMaxCounter } - def fromConfig(config: Config, system: ActorSystem): MinMaxCounter = { - import scala.concurrent.duration._ + def create(dynamicRange: DynamicRange, refreshInterval: FiniteDuration, scheduler: RefreshScheduler): MinMaxCounter = + apply(dynamicRange, refreshInterval, scheduler) - val highest = config.getLong("highest-trackable-value") - val significantDigits = config.getInt("significant-value-digits") - val refreshInterval = config.getDuration("refresh-interval", TimeUnit.MILLISECONDS) - - apply(highest, Histogram.Precision(significantDigits), Scale.Unit, refreshInterval.millis, system) - } } class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala new file mode 100644 index 00000000..cf6b8b4c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/UnitOfMeasurement.scala @@ -0,0 +1,55 @@ +package kamon.metric.instrument + +trait UnitOfMeasurement { + def name: String + def label: String + def factor: Double +} + +object UnitOfMeasurement { + case object Unknown extends UnitOfMeasurement { + val name = "unknown" + val label = "unknown" + val factor = 1D + } + + def isUnknown(uom: UnitOfMeasurement): Boolean = + uom == Unknown + + def isTime(uom: UnitOfMeasurement): Boolean = + uom.isInstanceOf[Time] + +} + +case class Time(factor: Double, label: String) extends UnitOfMeasurement { + val name = "time" + + /** + * Scale a value from this scale factor to a different scale factor. + * + * @param toUnit Time unit of the expected result. + * @param value Value to scale. + * @return Equivalent of value on the target time unit. + */ + def scale(toUnit: Time)(value: Long): Double = + (value * factor) / toUnit.factor +} + +object Time { + val Nanoseconds = Time(1E-9, "n") + val Microseconds = Time(1E-6, "µs") + val Milliseconds = Time(1E-3, "ms") + val Seconds = Time(1, "s") +} + +case class Memory(factor: Double, label: String) extends UnitOfMeasurement { + val name = "bytes" +} + +object Memory { + val Bytes = Memory(1, "b") + val KiloBytes = Memory(1024, "Kb") + val MegaBytes = Memory(1024E2, "Mb") + val GigaBytes = Memory(1024E3, "Gb") +} + |