From e8d3e612dcf0fa396a25920a23f108f6ab8c2e61 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 11 Jun 2017 10:02:22 +0200 Subject: separate metrics from instruments and add default instrument for metrics --- kamon-core/src/main/scala/kamon/Kamon.scala | 20 ++- .../src/main/scala/kamon/ReporterRegistry.scala | 15 +- .../src/main/scala/kamon/metric/Accumulator.scala | 2 +- .../src/main/scala/kamon/metric/Counter.scala | 6 +- kamon-core/src/main/scala/kamon/metric/Gauge.scala | 6 +- .../src/main/scala/kamon/metric/Histogram.scala | 21 +-- .../scala/kamon/metric/HistogramExtension.scala | 4 +- .../scala/kamon/metric/InstrumentFactory.scala | 13 +- .../src/main/scala/kamon/metric/Metric.scala | 161 +++++++++++++++++++++ .../src/main/scala/kamon/metric/MetricLookup.scala | 54 +++---- .../main/scala/kamon/metric/MetricRegistry.scala | 61 ++++---- .../main/scala/kamon/metric/MinMaxCounter.scala | 8 +- .../src/main/scala/kamon/metric/TickSnapshot.scala | 2 +- kamon-core/src/main/scala/kamon/package.scala | 1 + kamon-core/src/main/scala/kamon/trace/Span.scala | 11 +- kamon-core/src/main/scala/kamon/trace/Tracer.scala | 2 +- 16 files changed, 271 insertions(+), 116 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metric/Metric.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index a7df6a1b..50124e28 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -46,20 +46,18 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac } + override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric = + metricRegistry.histogram(name, unit, dynamicRange) - override def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: - Option[DynamicRange]): Histogram = - metricRegistry.histogram(name, unit, tags, dynamicRange) + override def counter(name: String, unit: MeasurementUnit): CounterMetric = + metricRegistry.counter(name, unit) - override def counter(name: String, unit: MeasurementUnit, tags: Map[String, String]): Counter = - metricRegistry.counter(name, unit, tags) + override def gauge(name: String, unit: MeasurementUnit): GaugeMetric = + metricRegistry.gauge(name, unit) - override def gauge(name: String, unit: MeasurementUnit, tags: Map[String, String]): Gauge = - metricRegistry.gauge(name, unit, tags) - - override def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Option[Duration], - dynamicRange: Option[DynamicRange]): MinMaxCounter = - metricRegistry.minMaxCounter(name, unit, tags, dynamicRange, sampleInterval) + override def minMaxCounter(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration], + dynamicRange: Option[DynamicRange]): MinMaxCounterMetric = + metricRegistry.minMaxCounter(name, unit, dynamicRange, sampleInterval) diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 7ef9047d..27a4a6ea 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -65,12 +65,11 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) private val reporterCounter = new AtomicLong(0L) - private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() private val metricReporters = TrieMap[Long, MetricReporterEntry]() + private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() private val spanReporters = TrieMap[Long, SpanReporterEntry]() - + private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() reconfigure(initialConfig) @@ -90,7 +89,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con addSpanReporter(reporter, name) - private def addMetricReporter(reporter: MetricReporter, name: String): Registration = { + private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) val reporterEntry = new MetricReporterEntry( id = reporterCounter.getAndIncrement(), @@ -102,7 +101,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con createRegistration(reporterEntry.id, metricReporters) } - private def addSpanReporter(reporter: SpanReporter, name: String): Registration = { + private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) val reporterEntry = new SpanReporterEntry( id = reporterCounter.incrementAndGet(), @@ -117,7 +116,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { override def cancel(): Boolean = - metricReporters.remove(id).nonEmpty + target.remove(id).nonEmpty } override def stopAllReporters(): Future[Unit] = { @@ -152,7 +151,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con val currentSpanTicker = spanReporterTickerSchedule.get() if(currentSpanTicker != null) { - currentSpanTicker .cancel(true) + currentSpanTicker.cancel(true) } // Reconfigure all registered reporters @@ -177,6 +176,8 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con } } + + private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { spanReporters.foreach { case (_, reporterEntry) => if(reporterEntry.isActive) diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala index aae1756f..d960565f 100644 --- a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala +++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala @@ -20,7 +20,7 @@ import kamon.util.MeasurementUnit class DistributionAccumulator(dynamicRange: DynamicRange) { private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator", - tags = Map.empty, measurementUnit = MeasurementUnit.none, dynamicRange) + tags = Map.empty, unit = MeasurementUnit.none, dynamicRange) def add(distribution: Distribution): Unit = { diff --git a/kamon-core/src/main/scala/kamon/metric/Counter.scala b/kamon-core/src/main/scala/kamon/metric/Counter.scala index b5f8353c..e6585021 100644 --- a/kamon-core/src/main/scala/kamon/metric/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/Counter.scala @@ -20,13 +20,13 @@ import kamon.jsr166.LongAdder import kamon.util.MeasurementUnit trait Counter { - def measurementUnit: MeasurementUnit + def unit: MeasurementUnit def increment(): Unit def increment(times: Long): Unit } -class LongAdderCounter(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit) +class LongAdderCounter(name: String, tags: Map[String, String], val unit: MeasurementUnit) extends SnapshotableCounter with StrictLogging { private val adder = new LongAdder() @@ -39,5 +39,5 @@ class LongAdderCounter(name: String, tags: Map[String, String], val measurementU else logger.warn(s"Ignored attempt to decrement counter [$name]") } - def snapshot(): MetricValue = MetricValue(name, tags, measurementUnit, adder.sumAndReset()) + def snapshot(): MetricValue = MetricValue(name, tags, unit, adder.sumAndReset()) } diff --git a/kamon-core/src/main/scala/kamon/metric/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/Gauge.scala index ee8ee3a8..6797dbfd 100644 --- a/kamon-core/src/main/scala/kamon/metric/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/Gauge.scala @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicLong import kamon.util.MeasurementUnit trait Gauge { - def measurementUnit: MeasurementUnit + def unit: MeasurementUnit def increment(): Unit def increment(times: Long): Unit @@ -30,7 +30,7 @@ trait Gauge { } -class AtomicLongGauge(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit) +class AtomicLongGauge(name: String, tags: Map[String, String], val unit: MeasurementUnit) extends SnapshotableGauge { private val currentValue = new AtomicLong(0L) @@ -51,5 +51,5 @@ class AtomicLongGauge(name: String, tags: Map[String, String], val measurementUn currentValue.set(value) def snapshot(): MetricValue = - MetricValue(name, tags, measurementUnit, currentValue.get()) + MetricValue(name, tags, unit, currentValue.get()) } diff --git a/kamon-core/src/main/scala/kamon/metric/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/Histogram.scala index 12111b83..1af55479 100644 --- a/kamon-core/src/main/scala/kamon/metric/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/Histogram.scala @@ -13,25 +13,26 @@ * ========================================================================================= */ -package kamon.metric +package kamon +package metric import java.nio.ByteBuffer -import com.typesafe.scalalogging.StrictLogging import kamon.util.MeasurementUnit import org.HdrHistogram.{AtomicHistogramExtension, ZigZag} +import org.slf4j.LoggerFactory trait Histogram { + def unit: MeasurementUnit def dynamicRange: DynamicRange - def measurementUnit: MeasurementUnit def record(value: Long): Unit def record(value: Long, times: Long): Unit } -class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: MeasurementUnit, val dynamicRange: DynamicRange) - extends AtomicHistogramExtension(dynamicRange) with SnapshotableHistogram with StrictLogging { +private[kamon] class HdrHistogram(name: String, tags: Map[String, String], val unit: MeasurementUnit, val dynamicRange: DynamicRange) + extends AtomicHistogramExtension(dynamicRange) with Histogram { def record(value: Long): Unit = tryRecord(value, 1) @@ -44,12 +45,12 @@ class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: recordValueWithCount(value, count) } catch { case anyException: Throwable ⇒ - logger.warn(s"Failed to store value [$value] in histogram [$name]. You might need to change " + - "your dynamic range configuration for this instrument.", anyException) + HdrHistogram.logger.warn(s"Failed to store value [$value] in histogram [$name]. You might need to change " + + "your dynamic range configuration for this instrument.", anyException) } } - override def snapshot(): MetricDistribution = { + def snapshot(): MetricDistribution = { val buffer = HdrHistogram.tempSnapshotBuffer.get() val counts = countsArray() val countsLimit = counts.length() @@ -95,7 +96,7 @@ class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: val distribution = new ZigZagCountsDistribution(totalCount, minIndex, maxIndex, ByteBuffer.wrap(zigZagCounts), protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) - MetricDistribution(name, tags, measurementUnit, dynamicRange, distribution) + MetricDistribution(name, tags, unit, dynamicRange, distribution) } private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer, @@ -199,6 +200,8 @@ class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: } object HdrHistogram { + private val logger = LoggerFactory.getLogger(classOf[HdrHistogram]) + // TODO: move this to some object pool might be better, or at private val tempSnapshotBuffer = new ThreadLocal[ByteBuffer] { override def initialValue(): ByteBuffer = ByteBuffer.allocate(33792) diff --git a/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala index 3bd211de..c77dc426 100644 --- a/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLongArray import kamon.metric.DynamicRange /** - * Exposes package-private members of [[org.HdrHistogram.AtomicHistogram]]. + * Exposes package-private members of org.HdrHistogram.AtomicHistogram. */ abstract class AtomicHistogramExtension(dr: DynamicRange) extends AtomicHistogram(dr.lowestDiscernibleValue, dr.highestTrackableValue, dr.significantValueDigits) { @@ -36,7 +36,7 @@ abstract class AtomicHistogramExtension(dr: DynamicRange) } /** - * Exposes the package-private members of [[org.HdrHistogram.ZigZagEncoding]]. + * Exposes the package-private members of org.HdrHistogram.ZigZagEncoding. */ object ZigZag { def putLong(buffer: ByteBuffer, value: Long): Unit = diff --git a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala index 4bd151d3..2eeb69f8 100644 --- a/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala @@ -29,11 +29,11 @@ import scala.concurrent.duration._ private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: DynamicRange, defaultMMCounterDynamicRange: DynamicRange, defaultMMCounterSampleInterval: Duration, customSettings: Map[String, CustomInstrumentSettings]) { - def buildHistogram(dynamicRange: Option[DynamicRange])(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableHistogram = + def buildHistogram(dynamicRange: Option[DynamicRange])(name: String, tags: Map[String, String], unit: MeasurementUnit): HdrHistogram = new HdrHistogram(name, tags, unit, instrumentDynamicRange(name, dynamicRange.getOrElse(defaultHistogramDynamicRange))) def buildMinMaxCounter(dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]) - (name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableMinMaxCounter = + (name: String, tags: Map[String, String], unit: MeasurementUnit): SimpleMinMaxCounter = new SimpleMinMaxCounter( name, tags, @@ -67,6 +67,15 @@ private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: Dy object InstrumentFactory { + case class InstrumentType(name: String) + + object InstrumentTypes { + val Histogram = InstrumentType("Histogram") + val MinMaxCounter = InstrumentType("MinMaxCounter") + val Counter = InstrumentType("Counter") + val Gauge = InstrumentType("Gauge") + } + def fromConfig(config: Config): InstrumentFactory = { val factoryConfig = config.getConfig("kamon.metric.instrument-factory") val histogramDynamicRange = readDynamicRange(factoryConfig.getConfig("default-settings.histogram")) diff --git a/kamon-core/src/main/scala/kamon/metric/Metric.scala b/kamon-core/src/main/scala/kamon/metric/Metric.scala new file mode 100644 index 00000000..58386353 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Metric.scala @@ -0,0 +1,161 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package metric + +import java.util.concurrent.atomic.AtomicReference + +import kamon.metric.InstrumentFactory.InstrumentType +import kamon.metric.InstrumentFactory.InstrumentTypes._ +import kamon.util.MeasurementUnit + +import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.Duration + + + +trait Metric[T] { + def name: String + def unit: MeasurementUnit + + def refine(tags: Tags): T + def refine(tags: (String, String)*): T + def refine(tag: String, value: String): T +} + +trait HistogramMetric extends Metric[Histogram] with Histogram +trait MinMaxCounterMetric extends Metric[MinMaxCounter] with MinMaxCounter +trait GaugeMetric extends Metric[Gauge] with Gauge +trait CounterMetric extends Metric[Counter] with Counter + + +abstract sealed class BaseMetric[T, S](val instrumentType: InstrumentType) extends Metric[T] { + private val instruments = TrieMap.empty[Tags, T] + protected lazy val baseInstrument: T = instruments.atomicGetOrElseUpdate(Map.empty, createInstrument(Map.empty)) + + def refine(tag: String, value: String): T = { + val instrumentTags = Map(tag -> value) + instruments.atomicGetOrElseUpdate(instrumentTags, createInstrument(instrumentTags)) + } + + def refine(tags: Map[String, String]): T = + instruments.atomicGetOrElseUpdate(tags, createInstrument(tags)) + + def refine(tags: (String, String)*): T = + refine(tags.toMap) + + + private[kamon] def snapshot(): Seq[S] = + instruments.values.map(createSnapshot).toSeq + + protected def createInstrument(tags: Tags): T + + protected def createSnapshot(instrument: T): S +} + + +private[kamon] final class HistogramMetricImpl(val name: String, val unit: MeasurementUnit, customDynamicRange: Option[DynamicRange], + factory: AtomicReference[InstrumentFactory]) extends BaseMetric[Histogram, MetricDistribution](Histogram) with HistogramMetric { + + def dynamicRange: DynamicRange = + baseInstrument.dynamicRange + + override def record(value: Long): Unit = + baseInstrument.record(value) + + override def record(value: Long, times: Long): Unit = + baseInstrument.record(value, times) + + override protected def createInstrument(tags: Tags): Histogram = + factory.get().buildHistogram(customDynamicRange)(name, tags, unit) + + override protected def createSnapshot(instrument: Histogram): MetricDistribution = + instrument.asInstanceOf[SnapshotableHistogram].snapshot() +} + +private[kamon] final class MinMaxCounterMetricImpl(val name: String, val unit: MeasurementUnit, customDynamicRange: Option[DynamicRange], + customSampleInterval: Option[Duration], factory: AtomicReference[InstrumentFactory]) + extends BaseMetric[MinMaxCounter, MetricDistribution](MinMaxCounter) with MinMaxCounterMetric { + + def dynamicRange: DynamicRange = + baseInstrument.dynamicRange + + override def sampleInterval: Duration = + baseInstrument.sampleInterval + + override def increment(): Unit = + baseInstrument.increment() + + override def increment(times: Long): Unit = + baseInstrument.increment(times) + + override def decrement(): Unit = + baseInstrument.decrement() + + override def decrement(times: Long): Unit = + baseInstrument.decrement(times) + + override def sample(): Unit = + baseInstrument.sample() + + override protected def createInstrument(tags: Tags): MinMaxCounter = + factory.get().buildMinMaxCounter(customDynamicRange, customSampleInterval)(name, tags, unit) + + override protected def createSnapshot(instrument: MinMaxCounter): MetricDistribution = + instrument.asInstanceOf[SnapshotableMinMaxCounter].snapshot() +} + + +private[kamon] final class CounterMetricImpl(val name: String, val unit: MeasurementUnit, factory: AtomicReference[InstrumentFactory]) + extends BaseMetric[Counter, MetricValue](Counter) with CounterMetric { + + override def increment(): Unit = + baseInstrument.increment() + + override def increment(times: Long): Unit = + baseInstrument.increment(times) + + override protected def createInstrument(tags: Tags): Counter = + factory.get().buildCounter(name, tags, unit) + + override protected def createSnapshot(instrument: Counter): MetricValue = + instrument.asInstanceOf[SnapshotableCounter].snapshot() +} + +private[kamon] final class GaugeMetricImpl(val name: String, val unit: MeasurementUnit, factory: AtomicReference[InstrumentFactory]) + extends BaseMetric[Gauge, MetricValue](Gauge) with GaugeMetric { + + override def increment(): Unit = + baseInstrument.increment() + + override def increment(times: Long): Unit = + baseInstrument.increment(times) + + override def decrement(): Unit = + baseInstrument.decrement() + + override def decrement(times: Long): Unit = + baseInstrument.decrement(times) + + override def set(value: Long): Unit = + baseInstrument.set(value) + + override protected def createInstrument(tags: Tags): Gauge = + factory.get().buildGauge(name, tags, unit) + + override protected def createSnapshot(instrument: Gauge): MetricValue = + instrument.asInstanceOf[SnapshotableGauge].snapshot() +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala index 10b409d9..a09a8f02 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala @@ -22,52 +22,42 @@ import scala.concurrent.duration.Duration trait MetricLookup { - def histogram(name: String): Histogram = - histogram(name, MeasurementUnit.none, Map.empty[String, String], None) + def histogram(name: String): HistogramMetric = + histogram(name, MeasurementUnit.none, None) - def histogram(name: String, unit: MeasurementUnit): Histogram = - histogram(name, unit, Map.empty[String, String], None) + def histogram(name: String, unit: MeasurementUnit): HistogramMetric = + histogram(name, unit, None) - def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String]): Histogram = - histogram(name, unit, tags, None) + def histogram(name: String, unit: MeasurementUnit, dynamicRange: DynamicRange): HistogramMetric = + histogram(name, unit, Some(dynamicRange)) - def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: DynamicRange): Histogram = - histogram(name, unit, tags, Some(dynamicRange)) - def counter(name: String): Counter = - counter(name, MeasurementUnit.none, Map.empty[String, String]) + def counter(name: String): CounterMetric = + counter(name, MeasurementUnit.none) - def counter(name: String, unit: MeasurementUnit): Counter = - counter(name, unit, Map.empty[String, String]) - def gauge(name: String): Gauge = - gauge(name, MeasurementUnit.none, Map.empty[String, String]) + def gauge(name: String): GaugeMetric = + gauge(name, MeasurementUnit.none) - def gauge(name: String, unit: MeasurementUnit): Gauge = - gauge(name, unit, Map.empty[String, String]) - def minMaxCounter(name: String): MinMaxCounter = - minMaxCounter(name, MeasurementUnit.none, Map.empty[String, String], None, None) + def minMaxCounter(name: String): MinMaxCounterMetric = + minMaxCounter(name, MeasurementUnit.none, None, None) - def minMaxCounter(name: String, unit: MeasurementUnit): MinMaxCounter = - minMaxCounter(name, unit, Map.empty[String, String], None, None) + def minMaxCounter(name: String, unit: MeasurementUnit): MinMaxCounterMetric = + minMaxCounter(name, unit, None, None) - def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String]): MinMaxCounter = - minMaxCounter(name, unit, tags, None, None) + def minMaxCounter(name: String, unit: MeasurementUnit, sampleInterval: Duration): MinMaxCounterMetric = + minMaxCounter(name, unit, Option(sampleInterval), None) - def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Duration): MinMaxCounter = - minMaxCounter(name, unit, tags, Option(sampleInterval), None) + def minMaxCounter(name: String, unit: MeasurementUnit, sampleInterval: Duration, dynamicRange: DynamicRange): MinMaxCounterMetric = + minMaxCounter(name, unit, Option(sampleInterval), Option(dynamicRange)) - def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Duration, - dynamicRange: DynamicRange): MinMaxCounter = - minMaxCounter(name, unit, tags, Option(sampleInterval), Option(dynamicRange)) - def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange]): Histogram + def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric - def counter(name: String, unit: MeasurementUnit, tags: Map[String, String]): Counter + def counter(name: String, unit: MeasurementUnit): CounterMetric - def gauge(name: String, unit: MeasurementUnit, tags: Map[String, String]): Gauge + def gauge(name: String, unit: MeasurementUnit): GaugeMetric - def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], sampleInterval: Option[Duration], - dynamicRange: Option[DynamicRange]): MinMaxCounter + def minMaxCounter(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration], dynamicRange: Option[DynamicRange]): MinMaxCounterMetric } diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index 3c2bc131..e47df88e 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.Config import com.typesafe.scalalogging.Logger +import kamon.metric.InstrumentFactory.{InstrumentType, InstrumentTypes} import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap @@ -28,25 +29,27 @@ import scala.concurrent.duration.Duration class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { private val logger = Logger(classOf[MetricRegistry]) - private val metrics = TrieMap.empty[String, MetricEntry] private val instrumentFactory = new AtomicReference[InstrumentFactory]() + private val metrics = TrieMap.empty[String, BaseMetric[_, _]] + reconfigure(initialConfig) def reconfigure(config: Config): Unit = synchronized { instrumentFactory.set(InstrumentFactory.fromConfig(config)) } - def histogram(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange]): Histogram = - lookupInstrument(name, unit, tags, InstrumentTypes.Histogram, instrumentFactory.get().buildHistogram(dynamicRange)) - def counter(name: String, unit: MeasurementUnit, tags: Map[String, String]): Counter = - lookupInstrument(name, unit, tags, InstrumentTypes.Counter, instrumentFactory.get().buildCounter) + def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric = + lookupMetric(name, unit, InstrumentTypes.Histogram)(new HistogramMetricImpl(name, unit, dynamicRange, instrumentFactory)) + + def counter(name: String, unit: MeasurementUnit): CounterMetric = + lookupMetric(name, unit, InstrumentTypes.Counter)(new CounterMetricImpl(name, unit, instrumentFactory)) - def gauge(name: String, unit: MeasurementUnit, tags: Map[String, String]): Gauge = - lookupInstrument(name, unit, tags, InstrumentTypes.Gauge, instrumentFactory.get().buildGauge) + def gauge(name: String, unit: MeasurementUnit): GaugeMetric = + lookupMetric(name, unit, InstrumentTypes.Gauge)(new GaugeMetricImpl(name, unit, instrumentFactory)) - def minMaxCounter(name: String, unit: MeasurementUnit, tags: Map[String, String], dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounter = - lookupInstrument(name, unit, tags, InstrumentTypes.MinMaxCounter, instrumentFactory.get().buildMinMaxCounter(dynamicRange, sampleInterval)) + def minMaxCounter(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange], sampleInterval: Option[Duration]): MinMaxCounterMetric = + lookupMetric(name, unit, InstrumentTypes.MinMaxCounter)(new MinMaxCounterMetricImpl(name, unit, dynamicRange, sampleInterval, instrumentFactory)) override def snapshot(): MetricsSnapshot = synchronized { @@ -55,15 +58,12 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { var counters = Seq.empty[MetricValue] var gauges = Seq.empty[MetricValue] - for { - metricEntry <- metrics.values - instrument <- metricEntry.instruments.values - } { + for(metricEntry <- metrics.values) { metricEntry.instrumentType match { - case InstrumentTypes.Histogram => histograms = histograms :+ instrument.asInstanceOf[SnapshotableHistogram].snapshot() - case InstrumentTypes.MinMaxCounter => mmCounters = mmCounters :+ instrument.asInstanceOf[SnapshotableMinMaxCounter].snapshot() - case InstrumentTypes.Gauge => gauges = gauges :+ instrument.asInstanceOf[SnapshotableGauge].snapshot() - case InstrumentTypes.Counter => counters = counters :+ instrument.asInstanceOf[SnapshotableCounter].snapshot() + case InstrumentTypes.Histogram => histograms = histograms ++ metricEntry.snapshot().asInstanceOf[Seq[MetricDistribution]] + case InstrumentTypes.MinMaxCounter => mmCounters = mmCounters ++ metricEntry.snapshot().asInstanceOf[Seq[MetricDistribution]] + case InstrumentTypes.Gauge => gauges = gauges ++ metricEntry.snapshot().asInstanceOf[Seq[MetricValue]] + case InstrumentTypes.Counter => counters = counters ++ metricEntry.snapshot().asInstanceOf[Seq[MetricValue]] case other => logger.warn("Unexpected instrument type [{}] found in the registry", other ) } } @@ -71,29 +71,18 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { MetricsSnapshot(histograms, mmCounters, gauges, counters) } - private def lookupInstrument[T](name: String, measurementUnit: MeasurementUnit, tags: Map[String, String], - instrumentType: InstrumentType, builder: (String, Map[String, String], MeasurementUnit) => T): T = { + private def lookupMetric[T <: BaseMetric[_, _]](name: String, unit: MeasurementUnit, instrumentType: InstrumentType)(metricBuilder: => T): T = { + val metric = metrics.atomicGetOrElseUpdate(name, metricBuilder) - val entry = metrics.atomicGetOrElseUpdate(name, MetricEntry(instrumentType, measurementUnit, TrieMap.empty)) - if(entry.instrumentType != instrumentType) - sys.error(s"Tried to use metric [$name] as a [${instrumentType.name}] but it is already defined as [${entry.instrumentType.name}] ") + if(metric.instrumentType != instrumentType) + sys.error(s"Cannot define metric [$name] as a [${instrumentType.name}], it is already defined as [${metric.instrumentType.name}] ") - if(entry.unit != measurementUnit) - logger.warn("Ignoring attempt to use measurement unit [{}] on metric [name={}, tags={}], the metric uses [{}]", - measurementUnit.magnitude.name, name, tags.prettyPrint(), entry.unit.magnitude.name) + if(metric.unit != unit) + logger.warn("Ignoring attempt to register measurement unit [{}] on metric [{}], the metric uses already uses [{}]", + unit.magnitude.name, name, metric.unit.magnitude.name) - entry.instruments.getOrElseUpdate(tags, builder(name, tags, measurementUnit)).asInstanceOf[T] + metric.asInstanceOf[T] } - - private case class InstrumentType(name: String) - private object InstrumentTypes { - val Histogram = InstrumentType("Histogram") - val MinMaxCounter = InstrumentType("MinMaxCounter") - val Counter = InstrumentType("Counter") - val Gauge = InstrumentType("Gauge") - } - - private case class MetricEntry(instrumentType: InstrumentType, unit: MeasurementUnit, instruments: TrieMap[Map[String, String], Any]) } trait MetricsSnapshotGenerator { diff --git a/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala index ae12f635..a09702ae 100644 --- a/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala @@ -23,9 +23,9 @@ import kamon.util.{AtomicLongMaxUpdater, MeasurementUnit} import scala.concurrent.duration.Duration trait MinMaxCounter { + def unit: MeasurementUnit def dynamicRange: DynamicRange def sampleInterval: Duration - def measurementUnit: MeasurementUnit def increment(): Unit def increment(times: Long): Unit @@ -34,7 +34,7 @@ trait MinMaxCounter { def sample(): Unit } -class SimpleMinMaxCounter(name: String, tags: Map[String, String], underlyingHistogram: Histogram with DistributionSnapshotInstrument, +class SimpleMinMaxCounter(name: String, tags: Map[String, String], underlyingHistogram: HdrHistogram, val sampleInterval: Duration) extends SnapshotableMinMaxCounter { private val min = AtomicLongMaxUpdater() @@ -44,8 +44,8 @@ class SimpleMinMaxCounter(name: String, tags: Map[String, String], underlyingHis def dynamicRange: DynamicRange = underlyingHistogram.dynamicRange - def measurementUnit: MeasurementUnit = - underlyingHistogram.measurementUnit + def unit: MeasurementUnit = + underlyingHistogram.unit private[kamon] def snapshot(): MetricDistribution = underlyingHistogram.snapshot() diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala index 2368879b..d5beed6c 100644 --- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -44,7 +44,7 @@ case class MetricValue(name: String, tags: Map[String, String], measurementUnit: * Snapshot for instruments that internally the distribution of values in a defined dynamic range. Meant to be used * with histograms and min max counters. */ -case class MetricDistribution(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, +case class MetricDistribution(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, distribution: Distribution) diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala index e98759c6..477876f8 100644 --- a/kamon-core/src/main/scala/kamon/package.scala +++ b/kamon-core/src/main/scala/kamon/package.scala @@ -23,6 +23,7 @@ import scala.collection.concurrent.TrieMap package object kamon { + type Tags = Map[String, String] /** diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index a23c1f49..583c2b35 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,13 +16,12 @@ package kamon package trace -import kamon.metric.MetricLookup import scala.collection.JavaConverters._ import kamon.util.{Clock, MeasurementUnit} class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long, - metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { + reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { private var isOpen: Boolean = true private val sampled: Boolean = spanContext.sampled @@ -154,18 +153,22 @@ class Span(spanContext: SpanContext, initialOperationName: String, initialTags: val elapsedTime = endTimestampMicros - startTimestampMicros val metricTags = Map("operation" -> operationName) ++ additionalMetricTags - val latencyHistogram = metrics.histogram("span.processing-time", MeasurementUnit.time.microseconds, metricTags) + val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags) latencyHistogram.record(elapsedTime) tags.get("error").foreach { errorTag => if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) { - metrics.counter("span.errors", MeasurementUnit.none, metricTags).increment() + //TODO: count properly metrics.counter("span.errors", MeasurementUnit.none, metricTags).increment() } } } } object Span { + object Metrics { + val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds) + } + val MetricTagPrefix = "metric." val BooleanTagTrueValue = "1" val BooleanTagFalseValue = "0" diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 65d8edb7..f8af39a5 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -137,7 +137,7 @@ class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, init } tracerMetrics.createdSpans.increment() - new Span(spanContext, operationName, initialTags, startTimestampMicros, metrics, reporterRegistry) + new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry) } private def createID(): Long = -- cgit v1.2.3