diff options
Diffstat (limited to 'kamon-core/src/main/scala')
14 files changed, 121 insertions, 127 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 468308f2..3a85be0a 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,15 +1,15 @@ package kamon -import java.time.Duration -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.{Config, ConfigFactory} -import kamon.metric.instrument._ -import kamon.metric.{MetricLookup, MetricRegistry} +import kamon.metric._ import kamon.trace.Tracer import kamon.util.{HexCodec, MeasurementUnit} +import scala.concurrent.duration.Duration +import scala.concurrent.forkjoin.ThreadLocalRandom + object Kamon extends MetricLookup { private val initialConfig = ConfigFactory.load() diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 3b59d8b7..a22162eb 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -1,6 +1,5 @@ package kamon -import java.time.Instant import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import java.util.concurrent._ @@ -12,6 +11,7 @@ import kamon.trace.Span import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import scala.util.Try import scala.util.control.NonFatal +import scala.collection.JavaConverters._ trait ReporterRegistry { def loadFromConfig(): Unit @@ -60,7 +60,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con override def add(reporter: MetricsReporter, name: String): Registration = { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = ReporterEntry( + val reporterEntry = new ReporterEntry( id = reporterCounter.getAndIncrement(), reporter = reporter, executionContext = ExecutionContext.fromExecutorService(executor) @@ -71,12 +71,10 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con new Registration { val reporterID = reporterEntry.id override def cancel(): Boolean = { - metricReporters.removeIf(entry => { - if(entry.id == reporterID) { - stopReporter(entry) - true - } else false - }) + metricReporters.iterator().asScala + .find(e => e.id == reporterID) + .map(e => stopReporter(e)) + .isDefined } } } @@ -99,31 +97,31 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con } } - Future.sequence(reporterStopFutures.result()).transform(_ => Try((): Unit)) + Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) } private[kamon] def reconfigure(config: Config): Unit = synchronized { - val tickInterval = config.getDuration("kamon.metric.tick-interval") + val tickIntervalMillis = config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS) val currentTicker = metricsTickerSchedule.get() if(currentTicker != null) { currentTicker.cancel(true) } // Reconfigure all registered reporters - metricReporters.forEach(entry => + metricReporters.iterator().asScala.foreach(entry => Future(entry.reporter.reconfigure(config))(entry.executionContext) ) metricsTickerSchedule.set { registryExecutionContext.scheduleAtFixedRate( - new MetricTicker(metrics, metricReporters), tickInterval.toMillis, tickInterval.toMillis, TimeUnit.MILLISECONDS + new MetricTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS ) } } private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { - spanReporters.forEach(_.reportSpan(span)) + spanReporters.iterator().asScala.foreach(_.reportSpan(span)) } private def stopReporter(entry: ReporterEntry): Future[Unit] = { @@ -134,25 +132,25 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con }(ExecutionContext.fromExecutor(registryExecutionContext)) } - private case class ReporterEntry( + private class ReporterEntry( @volatile var isActive: Boolean = true, - id: Long, - reporter: MetricsReporter, - executionContext: ExecutionContextExecutorService + val id: Long, + val reporter: MetricsReporter, + val executionContext: ExecutionContextExecutorService ) private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable { val logger = Logger(classOf[MetricTicker]) - var lastTick = Instant.now() + var lastTick = System.currentTimeMillis() def run(): Unit = try { - val currentTick = Instant.now() + val currentTick = System.currentTimeMillis() val tickSnapshot = TickSnapshot( interval = Interval(lastTick, currentTick), metrics = snapshotGenerator.snapshot() ) - reporterEntries.forEach { entry => + reporterEntries.iterator().asScala.foreach { entry => Future { if(entry.isActive) entry.reporter.reportTickSnapshot(tickSnapshot) diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/Counter.scala index f18e771c..38d312f0 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/Counter.scala @@ -1,6 +1,4 @@ -package kamon -package metric -package instrument +package kamon.metric import java.util.concurrent.atomic.LongAdder @@ -29,6 +27,6 @@ class LongAdderCounter(name: String, tags: Map[String, String], val measurementU logger.warn(s"Ignored attempt to decrement counter [$name]") } - def snapshot(): SingleValueSnapshot = - SingleValueSnapshot(name, tags, measurementUnit, adder.sumThenReset()) + def snapshot(): MetricValue = + MetricValue(name, tags, measurementUnit, adder.sumThenReset()) } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala b/kamon-core/src/main/scala/kamon/metric/DynamicRange.scala index 226f5450..f26b1052 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/DynamicRange.scala +++ b/kamon-core/src/main/scala/kamon/metric/DynamicRange.scala @@ -1,4 +1,4 @@ -package kamon.metric.instrument +package kamon.metric import java.util.concurrent.TimeUnit diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/Gauge.scala index acbff912..11876e99 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/Gauge.scala @@ -1,6 +1,7 @@ -package kamon.metric.instrument +package kamon.metric import java.util.concurrent.atomic.AtomicLong + import kamon.util.MeasurementUnit trait Gauge { @@ -34,6 +35,6 @@ class AtomicLongGauge(name: String, tags: Map[String, String], val measurementUn def set(value: Long): Unit = currentValue.set(value) - def snapshot(): SingleValueSnapshot = - SingleValueSnapshot(name, tags, measurementUnit, currentValue.get()) + def snapshot(): MetricValue = + MetricValue(name, tags, measurementUnit, currentValue.get()) } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala b/kamon-core/src/main/scala/kamon/metric/Histogram.scala index 29fe8c69..47b2a1a0 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Histogram.scala +++ b/kamon-core/src/main/scala/kamon/metric/Histogram.scala @@ -1,6 +1,4 @@ -package kamon -package metric -package instrument +package kamon.metric import java.nio.ByteBuffer @@ -36,7 +34,7 @@ class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: } } - override def snapshot(): DistributionSnapshot = { + override def snapshot(): MetricDistribution = { val buffer = HdrHistogram.tempSnapshotBuffer.get() val counts = countsArray() val countsLimit = counts.length() @@ -82,7 +80,7 @@ class HdrHistogram(name: String, tags: Map[String, String], val measurementUnit: val distribution = new ZigZagCountsDistribution(totalCount, minIndex, maxIndex, ByteBuffer.wrap(zigZagCounts), protectedUnitMagnitude(), protectedSubBucketHalfCount(), protectedSubBucketHalfCountMagnitude()) - DistributionSnapshot(name, tags, measurementUnit, dynamicRange, distribution) + MetricDistribution(name, tags, measurementUnit, dynamicRange, distribution) } private class ZigZagCountsDistribution(val count: Long, minIndex: Int, maxIndex: Int, zigZagCounts: ByteBuffer, diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala index dc3cad08..bf425ee1 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/HistogramExtension.scala +++ b/kamon-core/src/main/scala/kamon/metric/HistogramExtension.scala @@ -3,7 +3,7 @@ package org.HdrHistogram import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLongArray -import kamon.metric.instrument.DynamicRange +import kamon.metric.DynamicRange /** * Exposes package-private members of [[org.HdrHistogram.AtomicHistogram]]. diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala index 0e0536c6..68034bb8 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/InstrumentFactory.scala @@ -1,19 +1,19 @@ package kamon package metric -package instrument -import java.time.Duration + +import java.util.concurrent.TimeUnit import com.typesafe.config.Config -import kamon.metric.instrument.InstrumentFactory.CustomInstrumentSettings +import kamon.metric.InstrumentFactory.CustomInstrumentSettings import kamon.util.MeasurementUnit +import scala.concurrent.duration._ + private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: DynamicRange, defaultMMCounterDynamicRange: DynamicRange, defaultMMCounterSampleInterval: Duration, customSettings: Map[String, CustomInstrumentSettings]) { - println("DEFAULT: " + defaultHistogramDynamicRange) - def buildHistogram(dynamicRange: Option[DynamicRange])(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableHistogram = new HdrHistogram(name, tags, unit, instrumentDynamicRange(name, dynamicRange.getOrElse(defaultHistogramDynamicRange))) @@ -23,7 +23,7 @@ private[kamon] class InstrumentFactory private (defaultHistogramDynamicRange: Dy name, tags, buildHistogram(dynamicRange.orElse(Some(defaultMMCounterDynamicRange)))(name, tags, unit), - instrumentSampleInterval(name, sampleInterval.getOrElse(defaultMMCounterSampleInterval)) ) + instrumentSampleInterval(name, sampleInterval.getOrElse(defaultMMCounterSampleInterval))) def buildGauge(name: String, tags: Map[String, String], unit: MeasurementUnit): SnapshotableGauge = new AtomicLongGauge(name, tags, unit) @@ -56,14 +56,14 @@ object InstrumentFactory { val factoryConfig = config.getConfig("kamon.metric.instrument-factory") val histogramDynamicRange = readDynamicRange(factoryConfig.getConfig("default-settings.histogram")) val mmCounterDynamicRange = readDynamicRange(factoryConfig.getConfig("default-settings.min-max-counter")) - val mmCounterSampleInterval = factoryConfig.getDuration("default-settings.min-max-counter.sample-interval") + val mmCounterSampleInterval = factoryConfig.getDuration("default-settings.min-max-counter.sample-interval", TimeUnit.MILLISECONDS) val customSettings = factoryConfig.getConfig("custom-settings") .configurations .filter(nonEmptySection) .map(readCustomInstrumentSettings) - new InstrumentFactory(histogramDynamicRange, mmCounterDynamicRange, mmCounterSampleInterval, customSettings) + new InstrumentFactory(histogramDynamicRange, mmCounterDynamicRange, mmCounterSampleInterval.millis, customSettings) } private def nonEmptySection(entry: (String, Config)): Boolean = entry match { @@ -76,7 +76,7 @@ object InstrumentFactory { if (metricConfig.hasPath("lowest-discernible-value")) Some(metricConfig.getLong("lowest-discernible-value")) else None, if (metricConfig.hasPath("highest-trackable-value")) Some(metricConfig.getLong("highest-trackable-value")) else None, if (metricConfig.hasPath("significant-value-digits")) Some(metricConfig.getInt("significant-value-digits")) else None, - if (metricConfig.hasPath("sample-interval")) Some(metricConfig.getDuration("sample-interval")) else None + if (metricConfig.hasPath("sample-interval")) Some(metricConfig.getDuration("sample-interval", TimeUnit.MILLISECONDS).millis) else None ) (metricName -> customSettings) diff --git a/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala index db33b83c..e28b1435 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricLookup.scala @@ -1,10 +1,9 @@ package kamon package metric -import java.time.Duration -import kamon.metric.instrument._ import kamon.util.MeasurementUnit +import scala.concurrent.duration.Duration trait MetricLookup { diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index c6513f1a..de64bc17 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -1,15 +1,14 @@ package kamon package metric -import java.time.Duration import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.Config import com.typesafe.scalalogging.Logger -import kamon.metric.instrument._ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap +import scala.concurrent.duration.Duration class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { @@ -36,10 +35,10 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { override def snapshot(): MetricsSnapshot = synchronized { - var histograms = Seq.empty[DistributionSnapshot] - var mmCounters = Seq.empty[DistributionSnapshot] - var counters = Seq.empty[SingleValueSnapshot] - var gauges = Seq.empty[SingleValueSnapshot] + var histograms = Seq.empty[MetricDistribution] + var mmCounters = Seq.empty[MetricDistribution] + var counters = Seq.empty[MetricValue] + var gauges = Seq.empty[MetricValue] for { metricEntry <- metrics.values @@ -50,6 +49,7 @@ class MetricRegistry(initialConfig: Config) extends MetricsSnapshotGenerator { case InstrumentType.MinMaxCounter => mmCounters = mmCounters :+ instrument.asInstanceOf[SnapshotableMinMaxCounter].snapshot() case InstrumentType.Gauge => gauges = gauges :+ instrument.asInstanceOf[SnapshotableGauge].snapshot() case InstrumentType.Counter => counters = counters :+ instrument.asInstanceOf[SnapshotableCounter].snapshot() + case other => logger.warn("Unexpected instrument type [{}] found in the registry", other ) } } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala index 70094b7b..4ac1ce74 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/MinMaxCounter.scala @@ -1,12 +1,13 @@ -package kamon.metric.instrument +package kamon.metric import java.lang.Math.abs -import java.time.Duration -import java.util.concurrent.atomic.{AtomicLong, AtomicReference} +import java.util.concurrent.atomic.AtomicLong import kamon.jsr166.LongMaxUpdater import kamon.util.MeasurementUnit +import scala.concurrent.duration.Duration + trait MinMaxCounter { def dynamicRange: DynamicRange def sampleInterval: Duration @@ -33,7 +34,7 @@ class PaddedMinMaxCounter(name: String, tags: Map[String, String], underlyingHis def measurementUnit: MeasurementUnit = underlyingHistogram.measurementUnit - private[kamon] def snapshot(): DistributionSnapshot = + private[kamon] def snapshot(): MetricDistribution = underlyingHistogram.snapshot() def increment(): Unit = diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala index b7cc349e..e8587ffe 100644 --- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -1,18 +1,72 @@ package kamon.metric -import java.time.Instant -import kamon.metric.instrument.{DistributionSnapshot, SingleValueSnapshot} +import kamon.util.MeasurementUnit -case class Interval(from: Instant, to: Instant) +case class Interval(from: Long, to: Long) case class MetricsSnapshot( - histograms: Seq[DistributionSnapshot], - minMaxCounters: Seq[DistributionSnapshot], - gauges: Seq[SingleValueSnapshot], - counters: Seq[SingleValueSnapshot] + histograms: Seq[MetricDistribution], + minMaxCounters: Seq[MetricDistribution], + gauges: Seq[MetricValue], + counters: Seq[MetricValue] ) case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot) + +/** + * Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges. + * + */ +case class MetricValue(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, value: Long) + +/** + * Snapshot for instruments that internally the distribution of values in a defined dynamic range. Meant to be used + * with histograms and min max counters. + */ +case class MetricDistribution(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, + dynamicRange: DynamicRange, distribution: Distribution) + + +trait Distribution { + def buckets: Seq[Bucket] + def bucketsIterator: Iterator[Bucket] + + def min: Long + def max: Long + def sum: Long + def count: Long + def percentile(p: Double): Percentile + + def percentiles: Seq[Percentile] + def percentilesIterator: Iterator[Percentile] +} + +trait Bucket { + def value: Long + def frequency: Long +} + +trait Percentile { + def quantile: Double + def value: Long + def countUnderQuantile: Long +} + + +trait DistributionSnapshotInstrument { + private[kamon] def snapshot(): MetricDistribution +} + +trait SingleValueSnapshotInstrument { + private[kamon] def snapshot(): MetricValue +} + +trait SnapshotableHistogram extends Histogram with DistributionSnapshotInstrument +trait SnapshotableMinMaxCounter extends MinMaxCounter with DistributionSnapshotInstrument +trait SnapshotableCounter extends Counter with SingleValueSnapshotInstrument +trait SnapshotableGauge extends Gauge with SingleValueSnapshotInstrument + + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala deleted file mode 100644 index 1364c2d8..00000000 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala +++ /dev/null @@ -1,57 +0,0 @@ -package kamon.metric.instrument - -import kamon.util.MeasurementUnit - -/** - * Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges. - * - */ -case class SingleValueSnapshot(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, value: Long) - -/** - * Snapshot for instruments that internally the distribution of values in a defined dynamic range. Meant to be used - * with histograms and min max counters. - */ -case class DistributionSnapshot(name: String, tags: Map[String, String], measurementUnit: MeasurementUnit, - dynamicRange: DynamicRange, distribution: Distribution) - - -trait Distribution { - def buckets: Seq[Bucket] - def bucketsIterator: Iterator[Bucket] - - def min: Long - def max: Long - def sum: Long - def count: Long - def percentile(p: Double): Percentile - - def percentiles: Seq[Percentile] - def percentilesIterator: Iterator[Percentile] -} - -trait Bucket { - def value: Long - def frequency: Long -} - -trait Percentile { - def quantile: Double - def value: Long - def countUnderQuantile: Long -} - - -trait DistributionSnapshotInstrument { - private[kamon] def snapshot(): DistributionSnapshot -} - -trait SingleValueSnapshotInstrument { - private[kamon] def snapshot(): SingleValueSnapshot -} - -trait SnapshotableHistogram extends Histogram with DistributionSnapshotInstrument -trait SnapshotableMinMaxCounter extends MinMaxCounter with DistributionSnapshotInstrument -trait SnapshotableCounter extends Counter with SingleValueSnapshotInstrument -trait SnapshotableGauge extends Gauge with SingleValueSnapshotInstrument - diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala index 6e687d66..1c3a1205 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala @@ -1,11 +1,13 @@ package kamon.trace import java.net.{URLDecoder, URLEncoder} -import java.util.concurrent.ThreadLocalRandom +import scala.collection.JavaConverters._ import io.opentracing.propagation.TextMap import kamon.util.HexCodec +import scala.concurrent.forkjoin.ThreadLocalRandom + trait SpanContextCodec[T] { def inject(spanContext: SpanContext, carrier: T): Unit def extract(carrier: T, sampler: Sampler): SpanContext @@ -44,7 +46,7 @@ object SpanContextCodec { carrier.put(parentIDKey, encodeLong(spanContext.parentID)) carrier.put(spanIDKey, encodeLong(spanContext.spanID)) - spanContext.baggageItems().forEach { entry => + spanContext.baggageItems().iterator().asScala.foreach { entry => carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue)) } } @@ -56,7 +58,7 @@ object SpanContextCodec { var sampled: String = null var baggage: Map[String, String] = Map.empty - carrier.forEach { entry => + carrier.iterator().asScala.foreach { entry => if(entry.getKey.equals(traceIDKey)) traceID = baggageValueDecoder(entry.getValue) else if(entry.getKey.equals(parentIDKey)) |