From b94ca0cbb931799ccd2ad2d245660c4d48032c83 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 8 Jun 2017 15:35:42 +0200 Subject: get a basic threading model for the span reporters --- kamon-core/src/main/resources/reference.conf | 6 + .../src/main/scala/kamon/ReporterRegistry.scala | 155 +++++++++++++++------ .../src/main/scala/kamon/metric/Accumulator.scala | 17 +++ .../src/main/scala/kamon/metric/Scaler.scala | 58 ++++++++ .../src/main/scala/kamon/metric/TickSnapshot.scala | 13 +- .../src/main/scala/kamon/util/MetricScaler.scala | 58 -------- 6 files changed, 202 insertions(+), 105 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/metric/Accumulator.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/Scaler.scala delete mode 100644 kamon-core/src/main/scala/kamon/util/MetricScaler.scala diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index fd1c88c3..aca1a9f3 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -76,6 +76,12 @@ kamon { } trace { + + tick-interval = 10 seconds + + reporter-queue-size = 1024 + + # Configures a sample that decides which traces should be reported to the trace backends. The possible values are: # - always: report all traces. # - never: don't report any trace. diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index a22162eb..45c93ec4 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -12,13 +12,15 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Futu import scala.util.Try import scala.util.control.NonFatal import scala.collection.JavaConverters._ +import scala.collection.concurrent.TrieMap trait ReporterRegistry { def loadFromConfig(): Unit - def add(reporter: MetricsReporter): Registration - def add(reporter: MetricsReporter, name: String): Registration - def add(reporter: SpansReporter): Registration + def add(reporter: MetricReporter): Registration + def add(reporter: MetricReporter, name: String): Registration + def add(reporter: SpanReporter): Registration + def add(reporter: SpanReporter, name: String): Registration def stopAll(): Future[Unit] } @@ -28,7 +30,7 @@ trait Registration { def cancel(): Boolean } -trait MetricsReporter { +trait MetricReporter { def start(): Unit def stop(): Unit @@ -36,64 +38,86 @@ trait MetricsReporter { def reportTickSnapshot(snapshot: TickSnapshot): Unit } -trait SpansReporter { +trait SpanReporter { def start(): Unit def stop(): Unit def reconfigure(config: Config): Unit - def reportSpan(span: Span.CompletedSpan): Unit + def reportSpans(spans: Seq[Span.CompletedSpan]): Unit } class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry { + private val reporterCounter = new AtomicLong(0L) private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry")) private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]() - private val spanReporters = new ConcurrentLinkedQueue[SpansReporter]() - private val reporterCounter = new AtomicLong(0L) + private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + + private val metricReporters = TrieMap[Long, MetricReporterEntry]() + private val spanReporters = TrieMap[Long, SpanReporterEntry]() + reconfigure(initialConfig) override def loadFromConfig(): Unit = ??? - override def add(reporter: MetricsReporter): Registration = - add(reporter, reporter.getClass.getName()) + override def add(reporter: MetricReporter): Registration = + addMetricReporter(reporter, reporter.getClass.getName()) + + override def add(reporter: MetricReporter, name: String): Registration = + addMetricReporter(reporter, name) + + override def add(reporter: SpanReporter): Registration = + addSpanReporter(reporter, reporter.getClass.getName()) + + override def add(reporter: SpanReporter, name: String): Registration = + addSpanReporter(reporter, name) - override def add(reporter: MetricsReporter, name: String): Registration = { + + private def addMetricReporter(reporter: MetricReporter, name: String): Registration = { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new ReporterEntry( + val reporterEntry = new MetricReporterEntry( id = reporterCounter.getAndIncrement(), reporter = reporter, executionContext = ExecutionContext.fromExecutorService(executor) ) - metricReporters.add(reporterEntry) - - new Registration { - val reporterID = reporterEntry.id - override def cancel(): Boolean = { - metricReporters.iterator().asScala - .find(e => e.id == reporterID) - .map(e => stopReporter(e)) - .isDefined - } - } + metricReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, metricReporters) } - override def add(reporter: SpansReporter): Registration = { - spanReporters.add(reporter) + private def addSpanReporter(reporter: SpanReporter, name: String): Registration = { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = new SpanReporterEntry( + id = reporterCounter.incrementAndGet(), + reporter = reporter, + bufferCapacity = 1024, + executionContext = ExecutionContext.fromExecutorService(executor) + ) + + spanReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, spanReporters) + } - new Registration { - override def cancel(): Boolean = true - } + private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { + override def cancel(): Boolean = + metricReporters.remove(id).nonEmpty } override def stopAll(): Future[Unit] = { implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) val reporterStopFutures = Vector.newBuilder[Future[Unit]] - while(!metricReporters.isEmpty) { - val entry = metricReporters.poll() - if(entry != null) { - reporterStopFutures += stopReporter(entry) + + while(metricReporters.nonEmpty) { + val (idToRemove, _) = metricReporters.head + metricReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopMetricReporter(entry) + } + } + + while(spanReporters.nonEmpty) { + val (idToRemove, _) = spanReporters.head + spanReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopSpanReporter(entry) } } @@ -102,29 +126,51 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con private[kamon] def reconfigure(config: Config): Unit = synchronized { val tickIntervalMillis = config.getDuration("kamon.metric.tick-interval", TimeUnit.MILLISECONDS) + val traceTickIntervalMillis = config.getDuration("kamon.trace.tick-interval", TimeUnit.MILLISECONDS) + val currentTicker = metricsTickerSchedule.get() if(currentTicker != null) { currentTicker.cancel(true) } // Reconfigure all registered reporters - metricReporters.iterator().asScala.foreach(entry => + metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) - ) + } + + spanReporters.foreach { case (_, entry) => + Future(entry.reporter.reconfigure(config))(entry.executionContext) + } metricsTickerSchedule.set { registryExecutionContext.scheduleAtFixedRate( new MetricTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS ) } - } + spanReporterTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new SpanTicker(spanReporters), traceTickIntervalMillis, traceTickIntervalMillis, TimeUnit.MILLISECONDS + ) + } + } private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { - spanReporters.iterator().asScala.foreach(_.reportSpan(span)) + spanReporters.foreach { case (_, reporterEntry) => + if(reporterEntry.isActive) + reporterEntry.buffer.offer(span) + } + } + + private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { + entry.isActive = false + + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) } - private def stopReporter(entry: ReporterEntry): Future[Unit] = { + private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { entry.isActive = false Future(entry.reporter.stop())(entry.executionContext).andThen { @@ -132,14 +178,24 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con }(ExecutionContext.fromExecutor(registryExecutionContext)) } - private class ReporterEntry( + private class MetricReporterEntry( @volatile var isActive: Boolean = true, val id: Long, - val reporter: MetricsReporter, + val reporter: MetricReporter, val executionContext: ExecutionContextExecutorService ) - private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: java.util.Queue[ReporterEntry]) extends Runnable { + private class SpanReporterEntry( + @volatile var isActive: Boolean = true, + val id: Long, + val reporter: SpanReporter, + val bufferCapacity: Int, + val executionContext: ExecutionContextExecutorService + ) { + val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity) + } + + private class MetricTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { val logger = Logger(classOf[MetricTicker]) var lastTick = System.currentTimeMillis() @@ -150,7 +206,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con metrics = snapshotGenerator.snapshot() ) - reporterEntries.iterator().asScala.foreach { entry => + reporterEntries.foreach { case (_, entry) => Future { if(entry.isActive) entry.reporter.reportTickSnapshot(tickSnapshot) @@ -164,4 +220,19 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con case NonFatal(t) => logger.error("Error while running a tick", t) } } + + private class SpanTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { + override def run(): Unit = { + spanReporters.foreach { + case (_, entry) => + + val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity) + entry.buffer.drainTo(spanBatch, entry.bufferCapacity) + + Future { + entry.reporter.reportSpans(spanBatch.asScala) + }(entry.executionContext) + } + } + } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/Accumulator.scala b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala new file mode 100644 index 00000000..b87f5530 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Accumulator.scala @@ -0,0 +1,17 @@ +package kamon.metric + +import kamon.util.MeasurementUnit + + +class DistributionAccumulator(dynamicRange: DynamicRange) { + private val accumulatorHistogram = new HdrHistogram("metric-distribution-accumulator", + tags = Map.empty, measurementUnit = MeasurementUnit.none, dynamicRange) + + + def add(distribution: Distribution): Unit = { + distribution.bucketsIterator.foreach(b => accumulatorHistogram.record(b.value, b.frequency)) + } + + def result(): Distribution = + accumulatorHistogram.snapshot().distribution +} diff --git a/kamon-core/src/main/scala/kamon/metric/Scaler.scala b/kamon-core/src/main/scala/kamon/metric/Scaler.scala new file mode 100644 index 00000000..f8f51c00 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/Scaler.scala @@ -0,0 +1,58 @@ +package kamon.metric + +import kamon.util.MeasurementUnit +import kamon.util.MeasurementUnit.Dimension + +class Scaler(targetTimeUnit: MeasurementUnit, targetInformationUnit: MeasurementUnit, dynamicRange: DynamicRange) { + require(targetTimeUnit.dimension == Dimension.Time, "timeUnit must be in the time dimension.") + require(targetInformationUnit.dimension == Dimension.Information, "informationUnit must be in the information dimension.") + + val scaleHistogram = new HdrHistogram("scaler", Map.empty, MeasurementUnit.none, dynamicRange) + + def scaleDistribution(metric: MetricDistribution): MetricDistribution = { + metric.measurementUnit match { + case MeasurementUnit(Dimension.Time, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricDistributionToTarget(metric, targetTimeUnit) + + case MeasurementUnit(Dimension.Information, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricDistributionToTarget(metric, targetInformationUnit) + + case _ => metric + } + } + + def scaleMetricValue(metric: MetricValue): MetricValue = { + metric.measurementUnit match { + case MeasurementUnit(Dimension.Time, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricValueToTarget(metric, targetTimeUnit) + + case MeasurementUnit(Dimension.Information, magnitude) if(magnitude != targetTimeUnit.magnitude) => + scaleMetricValueToTarget(metric, targetInformationUnit) + + case _ => metric + } + } + + private def scaleMetricDistributionToTarget(metric: MetricDistribution, targetUnit: MeasurementUnit): MetricDistribution = { + metric.distribution.bucketsIterator.foreach(b => { + val scaledValue = MeasurementUnit.scale(b.value, metric.measurementUnit, targetUnit) + scaleHistogram.record(Math.ceil(scaledValue).toLong, b.frequency) + }) + + scaleHistogram.snapshot().copy( + name = metric.name, + tags = metric.tags, + measurementUnit = targetUnit, + dynamicRange = dynamicRange + ) + } + + private def scaleMetricValueToTarget(metric: MetricValue, targetUnit: MeasurementUnit): MetricValue = { + val scaledValue = MeasurementUnit.scale(metric.value, metric.measurementUnit, targetUnit) + + metric.copy( + value = Math.ceil(scaledValue).toLong, + measurementUnit = targetUnit + ) + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala index e8587ffe..6aed1ab3 100644 --- a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -1,8 +1,15 @@ package kamon.metric - import kamon.util.MeasurementUnit + +/** + * + * @param interval + * @param metrics + */ +case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot) + case class Interval(from: Long, to: Long) case class MetricsSnapshot( @@ -12,10 +19,6 @@ case class MetricsSnapshot( counters: Seq[MetricValue] ) -case class TickSnapshot(interval: Interval, metrics: MetricsSnapshot) - - - /** * Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges. * diff --git a/kamon-core/src/main/scala/kamon/util/MetricScaler.scala b/kamon-core/src/main/scala/kamon/util/MetricScaler.scala deleted file mode 100644 index ca77a9a1..00000000 --- a/kamon-core/src/main/scala/kamon/util/MetricScaler.scala +++ /dev/null @@ -1,58 +0,0 @@ -package kamon.util - -import kamon.metric.{DynamicRange, HdrHistogram, MetricDistribution, MetricValue} -import kamon.util.MeasurementUnit.Dimension - -class MetricScaler(targetTimeUnit: MeasurementUnit, targetInformationUnit: MeasurementUnit, dynamicRange: DynamicRange) { - require(targetTimeUnit.dimension == Dimension.Time, "timeUnit must be in the time dimension.") - require(targetInformationUnit.dimension == Dimension.Information, "informationUnit must be in the information dimension.") - - val scaleHistogram = new HdrHistogram("scaler", Map.empty, MeasurementUnit.none, dynamicRange) - - def scaleDistribution(metric: MetricDistribution): MetricDistribution = { - metric.measurementUnit match { - case MeasurementUnit(Dimension.Time, magnitude) if(magnitude != targetTimeUnit.magnitude) => - scaleMetricDistributionToTarget(metric, targetTimeUnit) - - case MeasurementUnit(Dimension.Information, magnitude) if(magnitude != targetTimeUnit.magnitude) => - scaleMetricDistributionToTarget(metric, targetInformationUnit) - - case _ => metric - } - } - - def scaleMetricValue(metric: MetricValue): MetricValue = { - metric.measurementUnit match { - case MeasurementUnit(Dimension.Time, magnitude) if(magnitude != targetTimeUnit.magnitude) => - scaleMetricValueToTarget(metric, targetTimeUnit) - - case MeasurementUnit(Dimension.Information, magnitude) if(magnitude != targetTimeUnit.magnitude) => - scaleMetricValueToTarget(metric, targetInformationUnit) - - case _ => metric - } - } - - private def scaleMetricDistributionToTarget(metric: MetricDistribution, targetUnit: MeasurementUnit): MetricDistribution = { - metric.distribution.bucketsIterator.foreach(b => { - val scaledValue = MeasurementUnit.scale(b.value, metric.measurementUnit, targetUnit) - scaleHistogram.record(Math.ceil(scaledValue).toLong, b.frequency) - }) - - scaleHistogram.snapshot().copy( - name = metric.name, - tags = metric.tags, - measurementUnit = targetUnit, - dynamicRange = dynamicRange - ) - } - - private def scaleMetricValueToTarget(metric: MetricValue, targetUnit: MeasurementUnit): MetricValue = { - val scaledValue = MeasurementUnit.scale(metric.value, metric.measurementUnit, targetUnit) - - metric.copy( - value = Math.ceil(scaledValue).toLong, - measurementUnit = targetUnit - ) - } -} -- cgit v1.2.3