From f1c6ceffa22c59a463d6d8cd2ca77e2b440eb450 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 29 Oct 2018 17:45:57 +0100 Subject: Implement a module registry that supports loading from configuration (#559) --- build.sbt | 5 +- .../src/test/scala/kamon/KamonLifecycleSpec.scala | 6 +- .../src/test/scala/kamon/trace/TracerSpec.scala | 26 +- kamon-core/src/main/resources/reference.conf | 19 +- kamon-core/src/main/scala/kamon/Kamon.scala | 109 +----- kamon-core/src/main/scala/kamon/Metrics.scala | 31 ++ .../src/main/scala/kamon/ModuleLoading.scala | 139 +++++++ .../src/main/scala/kamon/ReporterRegistry.scala | 405 -------------------- kamon-core/src/main/scala/kamon/Tracing.scala | 18 + kamon-core/src/main/scala/kamon/Utilities.scala | 58 +++ .../src/main/scala/kamon/module/Module.scala | 426 +++++++++++++++++++++ .../main/scala/kamon/module/ReportingModule.scala | 21 + kamon-core/src/main/scala/kamon/trace/Span.scala | 9 +- .../main/scala/kamon/trace/SpanPropagation.scala | 6 +- kamon-core/src/main/scala/kamon/trace/Tracer.scala | 64 +++- 15 files changed, 790 insertions(+), 552 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/Metrics.scala create mode 100644 kamon-core/src/main/scala/kamon/ModuleLoading.scala delete mode 100644 kamon-core/src/main/scala/kamon/ReporterRegistry.scala create mode 100644 kamon-core/src/main/scala/kamon/Tracing.scala create mode 100644 kamon-core/src/main/scala/kamon/Utilities.scala create mode 100644 kamon-core/src/main/scala/kamon/module/Module.scala create mode 100644 kamon-core/src/main/scala/kamon/module/ReportingModule.scala diff --git a/build.sbt b/build.sbt index 131626d9..04e20f49 100644 --- a/build.sbt +++ b/build.sbt @@ -46,8 +46,9 @@ lazy val core = (project in file("kamon-core")) .settings( libraryDependencies ++= Seq( "com.typesafe" % "config" % "1.3.1", - "org.slf4j" % "slf4j-api" % "1.7.25", - "org.hdrhistogram" % "HdrHistogram" % "2.1.9" + "org.hdrhistogram" % "HdrHistogram" % "2.1.9", + "org.jctools" % "jctools-core" % "2.1.1", + "org.slf4j" % "slf4j-api" % "1.7.25" ) ) diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala index b5d0425b..2a0af8c0 100644 --- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala @@ -10,17 +10,17 @@ import org.scalatest.{Matchers, WordSpec} import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar._ -class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{ +class KamonLifecycleSpec extends WordSpec with Matchers with Eventually { "the Kamon lifecycle" should { - "keep the JVM running if reporters are running" in { + "keep the JVM running if modules are running" in { val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter")) Thread.sleep(5000) process.isAlive shouldBe true process.destroyForcibly().waitFor(5, TimeUnit.SECONDS) } - "let the JVM stop after all reporters are stopped" in { + "let the JVM stop after all modules are stopped" in { val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithTemporaryReporter")) Thread.sleep(2000) process.isAlive shouldBe true diff --git a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala index f325d1d1..b52303f6 100644 --- a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala @@ -29,7 +29,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe "the Kamon tracer" should { "construct a minimal Span that only has a operation name" in { - val span = tracer.buildSpan("myOperation").start() + val span = Kamon.buildSpan("myOperation").start() val spanData = inspect(span) spanData.operationName() shouldBe "myOperation" @@ -38,7 +38,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } "pass the operation name and tags to started Span" in { - val span = tracer.buildSpan("myOperation") + val span = Kamon.buildSpan("myOperation") .withMetricTag("metric-tag", "value") .withMetricTag("metric-tag", "value") .withTag("hello", "world") @@ -60,16 +60,16 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } "not have any parent Span if there is no Span in the current context and no parent was explicitly given" in { - val span = tracer.buildSpan("myOperation").start() + val span = Kamon.buildSpan("myOperation").start() val spanData = inspect(span) spanData.context().parentID shouldBe IdentityProvider.NoIdentifier } "automatically take the Span from the current Context as parent" in { - val parent = tracer.buildSpan("myOperation").start() + val parent = Kamon.buildSpan("myOperation").start() val child = Kamon.withSpan(parent) { - tracer.buildSpan("childOperation").asChildOf(parent).start() + Kamon.buildSpan("childOperation").asChildOf(parent).start() } val parentData = inspect(parent) @@ -78,9 +78,9 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } "ignore the span from the current context as parent if explicitly requested" in { - val parent = tracer.buildSpan("myOperation").start() + val parent = Kamon.buildSpan("myOperation").start() val child = Kamon.withSpan(parent) { - tracer.buildSpan("childOperation").ignoreParentFromContext().start() + Kamon.buildSpan("childOperation").ignoreParentFromContext().start() } val childData = inspect(child) @@ -88,7 +88,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } "allow overriding the start timestamp for a Span" in { - val span = tracer.buildSpan("myOperation").withFrom(Instant.EPOCH.plusMillis(321)).start() + val span = Kamon.buildSpan("myOperation").withFrom(Instant.EPOCH.plusMillis(321)).start() val spanData = inspect(span) spanData.from() shouldBe Instant.EPOCH.plusMillis(321) } @@ -101,7 +101,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } val remoteParent = Span.Remote(createSpanContext()) - val childData = inspect(tracer.buildSpan("local").asChildOf(remoteParent).start()) + val childData = inspect(Kamon.buildSpan("local").asChildOf(remoteParent).start()) childData.context().traceID shouldBe remoteParent.context.traceID childData.context().parentID shouldBe remoteParent.context.parentID @@ -114,10 +114,10 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe val sampledRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.Sample)) val notSampledRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.DoNotSample)) - tracer.buildSpan("childOfSampled").asChildOf(sampledRemoteParent).start().context() + Kamon.buildSpan("childOfSampled").asChildOf(sampledRemoteParent).start().context() .samplingDecision shouldBe(SamplingDecision.Sample) - tracer.buildSpan("childOfNotSampled").asChildOf(notSampledRemoteParent).start().context() + Kamon.buildSpan("childOfNotSampled").asChildOf(notSampledRemoteParent).start().context() .samplingDecision shouldBe(SamplingDecision.DoNotSample) } @@ -129,7 +129,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } val unknownSamplingRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.Unknown)) - tracer.buildSpan("childOfSampled").asChildOf(unknownSamplingRemoteParent).start().context() + Kamon.buildSpan("childOfSampled").asChildOf(unknownSamplingRemoteParent).start().context() .samplingDecision shouldBe(SamplingDecision.Sample) Kamon.reconfigure(previousConfig) @@ -137,6 +137,4 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe } - val tracer: Tracer = Kamon - } diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 2fa3d33b..0fb3ce0a 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -25,11 +25,22 @@ kamon { } } - # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadReportersFromConfig()`. All - # reporter classes must have a default constructor. No metric filtering is applied to metric reporters started this way. + # Modules that can be automatically discovered and started by Kamon. The configuration for each module has the + # following schema: + # + # kamon.modules { + # module-name { + # enabled = true + # class = "com.example.ModuleClass" + # } + # } + # + # All available modules in the classpath are started when calling Kamon.loadModules() and stopped when calling + # Kamon.stopModules(). + # + modules { - # Example: `reporters = ["kamon.prometheus.PrometheusReporter", "kamon.zipkin.ZipkinReporter"]`. - reporters = [ ] + } # Pool size for the executor service that will run sampling on RangeSampler instruments. This scheduler is accesible # through Kamon.scheduler() diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index a6cc7bf4..ab95d773 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -15,113 +15,22 @@ package kamon -import java.time.Duration -import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} +object Kamon extends ClassLoading + with Configuration + with Utilities + with Metrics + with Tracing + with ModuleLoading + with ContextPropagation + with ContextStorage { -import com.typesafe.config.{Config, ConfigFactory} -import kamon.metric._ -import kamon.trace._ -import kamon.util.{Clock, Filters, Matcher, Registration} -import org.slf4j.LoggerFactory -import scala.concurrent.Future -import scala.util.Try - - -object Kamon extends MetricLookup with ClassLoading with Configuration with ReporterRegistry with Tracer with ContextPropagation with ContextStorage { - private val logger = LoggerFactory.getLogger("kamon.Kamon") - - @volatile private var _config = ConfigFactory.load() - @volatile private var _environment = Environment.fromConfig(_config) - @volatile private var _filters = Filters.fromConfig(_config) - - private val _clock = new Clock.Default() - private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler", daemon = true)) - private val _metrics = new MetricRegistry(_config, _scheduler) - private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock) - private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock) - //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] - - sys.addShutdownHook(() => _scheduler.shutdown()) + @volatile private var _environment = Environment.fromConfig(config()) def environment: Environment = _environment onReconfigure(newConfig => { - _config = config _environment = Environment.fromConfig(config) - _filters = Filters.fromConfig(config) - _metrics.reconfigure(config) - _reporterRegistry.reconfigure(config) - _tracer.reconfigure(config) - - _scheduler match { - case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config)) - case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other) - } }) - - override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric = - _metrics.histogram(name, unit, dynamicRange) - - override def counter(name: String, unit: MeasurementUnit): CounterMetric = - _metrics.counter(name, unit) - - override def gauge(name: String, unit: MeasurementUnit): GaugeMetric = - _metrics.gauge(name, unit) - - override def rangeSampler(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration], - dynamicRange: Option[DynamicRange]): RangeSamplerMetric = - _metrics.rangeSampler(name, unit, dynamicRange, sampleInterval) - - override def timer(name: String, dynamicRange: Option[DynamicRange]): TimerMetric = - _metrics.timer(name, dynamicRange) - - - def tracer: Tracer = - _tracer - - override def buildSpan(operationName: String): Tracer.SpanBuilder = - _tracer.buildSpan(operationName) - - - override def identityProvider: IdentityProvider = - _tracer.identityProvider - - override def loadReportersFromConfig(): Unit = - _reporterRegistry.loadReportersFromConfig() - - override def addReporter(reporter: MetricReporter): Registration = - _reporterRegistry.addReporter(reporter) - - override def addReporter(reporter: MetricReporter, name: String): Registration = - _reporterRegistry.addReporter(reporter, name) - - override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration = - _reporterRegistry.addReporter(reporter, name, filter) - - override def addReporter(reporter: SpanReporter): Registration = - _reporterRegistry.addReporter(reporter) - - override def addReporter(reporter: SpanReporter, name: String): Registration = - _reporterRegistry.addReporter(reporter, name) - - override def stopAllReporters(): Future[Unit] = - _reporterRegistry.stopAllReporters() - - def filter(filterName: String, pattern: String): Boolean = - _filters.accept(filterName, pattern) - - def filter(filterName: String): Matcher = - _filters.get(filterName) - - def clock(): Clock = - _clock - - def scheduler(): ScheduledExecutorService = - _scheduler - - private def schedulerPoolSize(config: Config): Int = - config.getInt("kamon.scheduler-pool-size") - } diff --git a/kamon-core/src/main/scala/kamon/Metrics.scala b/kamon-core/src/main/scala/kamon/Metrics.scala new file mode 100644 index 00000000..a2b74c20 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Metrics.scala @@ -0,0 +1,31 @@ +package kamon + +import java.time.Duration + +import kamon.metric._ + + +trait Metrics extends MetricLookup { self: Configuration with Utilities => + private val _metricsRegistry = new MetricRegistry(self.config(), self.scheduler()) + + override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric = + _metricsRegistry.histogram(name, unit, dynamicRange) + + override def counter(name: String, unit: MeasurementUnit): CounterMetric = + _metricsRegistry.counter(name, unit) + + override def gauge(name: String, unit: MeasurementUnit): GaugeMetric = + _metricsRegistry.gauge(name, unit) + + override def rangeSampler(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration], + dynamicRange: Option[DynamicRange]): RangeSamplerMetric = + _metricsRegistry.rangeSampler(name, unit, dynamicRange, sampleInterval) + + override def timer(name: String, dynamicRange: Option[DynamicRange]): TimerMetric = + _metricsRegistry.timer(name, dynamicRange) + + + protected def metricRegistry(): MetricRegistry = + _metricsRegistry + +} diff --git a/kamon-core/src/main/scala/kamon/ModuleLoading.scala b/kamon-core/src/main/scala/kamon/ModuleLoading.scala new file mode 100644 index 00000000..8fe035d6 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ModuleLoading.scala @@ -0,0 +1,139 @@ +package kamon + +import com.typesafe.config.Config +import kamon.metric.{MetricsSnapshot, PeriodSnapshot} +import kamon.module.Module +import kamon.util.Registration +import kamon.module.{MetricReporter => NewMetricReporter} +import kamon.module.{SpanReporter => NewSpanReporter} +import kamon.module.Module.{Registration => NewRegistration} +import kamon.trace.Span + +import scala.concurrent.Future + +@deprecated("Use kamon.module.Module instead", "1.2.0") +sealed trait Reporter extends Module { } + +@deprecated("Use kamon.module.MetricReporter instead", "1.2.0") +trait MetricReporter extends kamon.module.MetricReporter { } + +@deprecated("Use kamon.module.SpanReporter instead", "1.2.0") +trait SpanReporter extends kamon.module.SpanReporter { } + +/** + * Handles the lifecycle of all modules known by Kamon. The most common implementations of modules are metrics and + * span reporters, but modules can be used to encapsulate any process that should be started automatically by Kamon and + * stopped when all modules are stopped (usually during shutdown). + * + * Modules can be automatically discovered from the kamon.modules configuration key, using the following schema: + * + * kamon.modules { + * module-name { + * enabled = true + * class = "com.example.MyModule" + * } + * } + * + */ +trait ModuleLoading { self: ClassLoading with Configuration with Utilities with Metrics with Tracing => + private val _moduleRegistry = new Module.Registry(self, self, clock(), self.metricRegistry(), self.tracer()) + self.onReconfigure(newConfig => self._moduleRegistry.reconfigure(newConfig)) + + + /** + * Register a module instantiated by the user. + * + * @param name Module name. Registration will fail if a module with the given name already exists. + * @param module The module instance. + * @return A Registration that can be used to de-register the module at any time. + */ + def registerModule(name: String, module: Module): NewRegistration = + _moduleRegistry.register(name, module) + + /** + * Loads modules from Kamon's configuration. + */ + def loadModules(): Unit = + _moduleRegistry.load(self.config()) + + /** + * Stops all registered modules. This includes automatically and programmatically registered modules. + * + * @return A future that completes when the stop callback on all available modules have been completed. + */ + def stopModules(): Future[Unit] = + _moduleRegistry.stop() + + + + // Compatibility with Kamon <1.2.0 + + @deprecated("Use registerModule instead", "1.2.0") + def addReporter(reporter: MetricReporter): Registration = + wrapRegistration(_moduleRegistry.register(reporter.getClass.getName(), wrapLegacyMetricReporter(reporter))) + + @deprecated("Use registerModule instead", "1.2.0") + def addReporter(reporter: MetricReporter, name: String): Registration = + wrapRegistration(_moduleRegistry.register(name, wrapLegacyMetricReporter(reporter))) + + @deprecated("Use registerModule instead", "1.2.0") + def addReporter(reporter: MetricReporter, name: String, filter: String): Registration = + wrapRegistration(_moduleRegistry.register(name, wrapLegacyMetricReporter(reporter, Some(filter)))) + + @deprecated("Use registerModule instead", "1.2.0") + def addReporter(reporter: SpanReporter): Registration = + wrapRegistration(_moduleRegistry.register(reporter.getClass.getName(), wrapLegacySpanReporter(reporter))) + + @deprecated("Use registerModule instead", "1.2.0") + def addReporter(reporter: SpanReporter, name: String): Registration = + wrapRegistration(_moduleRegistry.register(name, wrapLegacySpanReporter(reporter))) + + @deprecated("Use stopModules instead", "1.2.0") + def stopAllReporters(): Future[Unit] = + _moduleRegistry.stop() + + @deprecated("Use loadModules instead", "1.2.0") + def loadReportersFromConfig(): Unit = + loadModules() + + + private def wrapRegistration(registration: NewRegistration): Registration = new Registration { + override def cancel(): Boolean = { + registration.cancel() + true + } + } + + private def wrapLegacyMetricReporter(reporter: MetricReporter, filter: Option[String] = None): NewMetricReporter = new NewMetricReporter { + override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { + val filteredSnapshot = filter + .map(f => filterMetrics(f, snapshot)) + .getOrElse(snapshot) + reporter.reportPeriodSnapshot(filteredSnapshot) + } + + private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = { + val metricFilter = Kamon.filter(filterName) + val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name)) + val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name)) + val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name)) + val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name)) + + periodSnapshot.copy(metrics = MetricsSnapshot( + histograms, rangeSamplers, gauges, counters + )) + } + + override def start(): Unit = reporter.start() + override def stop(): Unit = reporter.stop() + override def reconfigure(config: Config): Unit = reporter.reconfigure(config) + } + + private def wrapLegacySpanReporter(reporter: SpanReporter): NewSpanReporter = new NewSpanReporter { + override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = reporter.reportSpans(spans) + override def start(): Unit = reporter.start() + override def stop(): Unit = reporter.stop() + override def reconfigure(newConfig: Config): Unit = reporter.reconfigure(newConfig) + } + +} diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala deleted file mode 100644 index 06b9761a..00000000 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ /dev/null @@ -1,405 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon - -import java.time.{Duration, Instant} -import java.util.concurrent.atomic.{AtomicLong, AtomicReference} -import java.util.concurrent._ - -import com.typesafe.config.Config -import kamon.metric._ -import kamon.trace.Span -import kamon.trace.Span.FinishedSpan -import kamon.util.{Clock, DynamicAccess, Registration} -import org.slf4j.LoggerFactory - -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} -import scala.util.Try -import scala.util.control.NonFatal -import scala.collection.JavaConverters._ -import scala.collection.concurrent.TrieMap -import scala.concurrent.duration._ - -sealed trait Reporter { - def start(): Unit - def stop(): Unit - def reconfigure(config: Config): Unit -} - -trait MetricReporter extends Reporter { - def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit -} - -trait SpanReporter extends Reporter { - def reportSpans(spans: Seq[Span.FinishedSpan]): Unit -} - -trait ReporterRegistry { - def loadReportersFromConfig(): Unit - - def addReporter(reporter: MetricReporter): Registration - def addReporter(reporter: MetricReporter, name: String): Registration - def addReporter(reporter: MetricReporter, name: String, filter: String): Registration - def addReporter(reporter: SpanReporter): Registration - def addReporter(reporter: SpanReporter, name: String): Registration - - def stopAllReporters(): Future[Unit] -} - -object ReporterRegistry { - - private[kamon] trait SpanSink { - def reportSpan(finishedSpan: FinishedSpan): Unit - } - - private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink { - private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) - private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry", daemon = true)) - private val reporterCounter = new AtomicLong(0L) - private var registryConfiguration = readRegistryConfiguration(initialConfig) - - private val metricReporters = TrieMap[Long, MetricReporterEntry]() - private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val spanReporters = TrieMap[Long, SpanReporterEntry]() - private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - - - reconfigure(initialConfig) - - override def loadReportersFromConfig(): Unit = { - if(registryConfiguration.configuredReporters.isEmpty) - logger.info("The kamon.reporters setting is empty, no reporters have been started.") - else { - registryConfiguration.configuredReporters.foreach { reporterFQCN => - val dynamicAccess = new DynamicAccess(getClass.getClassLoader) - dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({ - case mr: MetricReporter => - addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded metric reporter [{}]", reporterFQCN) - - case sr: SpanReporter => - addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded span reporter [{}]", reporterFQCN) - - }).failed.foreach { - t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) - } - } - } - } - - override def addReporter(reporter: MetricReporter): Registration = - addMetricReporter(reporter, reporter.getClass.getName()) - - override def addReporter(reporter: MetricReporter, name: String): Registration = - addMetricReporter(reporter, name) - - override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration = - addMetricReporter(reporter, name, Some(filter)) - - override def addReporter(reporter: SpanReporter): Registration = - addSpanReporter(reporter, reporter.getClass.getName()) - - override def addReporter(reporter: SpanReporter, name: String): Registration = - addSpanReporter(reporter, name) - - - private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new MetricReporterEntry( - id = reporterCounter.getAndIncrement(), - name = name, - reporter = reporter, - filter = filter, - executionContext = ExecutionContext.fromExecutorService(executor) - ) - - Future { - Try { - reporterEntry.reporter.start() - }.failed.foreach { error => - logger.error(s"Metric reporter [$name] failed to start.", error) - } - }(reporterEntry.executionContext) - - if(metricReporters.isEmpty) - reStartMetricTicker() - - metricReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, metricReporters) - - } - - private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new SpanReporterEntry( - id = reporterCounter.incrementAndGet(), - name = name, - reporter = reporter, - bufferCapacity = registryConfiguration.traceReporterQueueSize, - executionContext = ExecutionContext.fromExecutorService(executor) - ) - - Future { - Try { - reporterEntry.reporter.start() - }.failed.foreach { error => - logger.error(s"Span reporter [$name] failed to start.", error) - } - }(reporterEntry.executionContext) - - if(spanReporters.isEmpty) - reStartTraceTicker() - - spanReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, spanReporters) - } - - private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { - override def cancel(): Boolean = - target.remove(id).nonEmpty - } - - override def stopAllReporters(): Future[Unit] = { - implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) - val reporterStopFutures = Vector.newBuilder[Future[Unit]] - - while(metricReporters.nonEmpty) { - val (idToRemove, _) = metricReporters.head - metricReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopMetricReporter(entry) - } - } - - while(spanReporters.nonEmpty) { - val (idToRemove, _) = spanReporters.head - spanReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopSpanReporter(entry) - } - } - - Future.sequence(reporterStopFutures.result()).map(_ => ()) - } - - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val oldConfig = registryConfiguration - registryConfiguration = readRegistryConfiguration(config) - - if(oldConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty) - reStartMetricTicker() - - if(oldConfig.traceTickInterval != registryConfiguration.traceTickInterval && spanReporters.nonEmpty) - reStartTraceTicker() - - // Reconfigure all registered reporters - metricReporters.foreach { - case (_, entry) => - Future { - Try { - entry.reporter.reconfigure(config) - }.failed.foreach { error => - logger.error(s"Metric reporter [${entry.name}] failed to reconfigure.", error) - } - }(entry.executionContext) - } - spanReporters.foreach { - case (_, entry) => - Future { - Try { - entry.reporter.reconfigure(config) - }.failed.foreach { error => - logger.error(s"Span reporter [${entry.name}] failed to reconfigure.", error) - } - }(entry.executionContext) - } - } - - - private def reStartMetricTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis - val currentMetricTicker = metricReporterTickerSchedule.get() - - if(currentMetricTicker != null) - currentMetricTicker.cancel(false) - - metricReporterTickerSchedule.set { - val initialDelay = - if(registryConfiguration.optimisticMetricTickAlignment) { - val now = clock.instant() - val nextTick = Clock.nextTick(now, registryConfiguration.metricTickInterval) - Duration.between(now, nextTick).toMillis - - } else tickIntervalMillis - - registryExecutionContext.scheduleAtFixedRate( - new MetricReporterTicker(metrics, metricReporters, clock), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS - ) - } - } - - private def reStartTraceTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis - val currentSpanTicker = spanReporterTickerSchedule.get() - if(currentSpanTicker != null) - currentSpanTicker.cancel(false) - - spanReporterTickerSchedule.set { - registryExecutionContext.scheduleAtFixedRate( - new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS - ) - } - } - - def reportSpan(span: Span.FinishedSpan): Unit = { - spanReporters.foreach { case (_, reporterEntry) => - if(reporterEntry.isActive) - reporterEntry.buffer.offer(span) - } - } - - private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { - entry.isActive = false - - Future { - Try { - entry.reporter.stop() - }.failed.foreach { error => - logger.error(s"Metric reporter [${entry.name}] failed to stop.", error) - } - }(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } - - private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { - entry.isActive = false - - Future { - Try { - entry.reporter.stop() - }.failed.foreach { error => - logger.error(s"Span reporter [${entry.name}] failed to stop.", error) - } - }(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } - - private class MetricReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: MetricReporter, - val filter: Option[String], - val executionContext: ExecutionContextExecutorService - ) - - private class SpanReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: SpanReporter, - val bufferCapacity: Int, - val executionContext: ExecutionContextExecutorService - ) { - val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) - } - - private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry], - clock: Clock) extends Runnable { - - val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) - var lastInstant = Instant.now(clock) - - def run(): Unit = try { - val currentInstant = Instant.now(clock) - val periodSnapshot = PeriodSnapshot( - from = lastInstant, - to = currentInstant, - metrics = snapshotGenerator.snapshot() - ) - - reporterEntries.foreach { case (_, entry) => - Future { - Try { - if (entry.isActive) { - val filteredSnapshot = entry.filter - .map(f => filterMetrics(f, periodSnapshot)) - .getOrElse(periodSnapshot) - - entry.reporter.reportPeriodSnapshot(filteredSnapshot) - } - - }.failed.foreach { error => - logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) - } - - }(entry.executionContext) - } - - lastInstant = currentInstant - - } catch { - case NonFatal(t) => logger.error("Error while running a tick", t) - } - - private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = { - val metricFilter = Kamon.filter(filterName) - val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name)) - val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name)) - val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name)) - val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name)) - - periodSnapshot.copy(metrics = MetricsSnapshot( - histograms, rangeSamplers, gauges, counters - )) - } - } - - - private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { - override def run(): Unit = { - spanReporters.foreach { - case (_, entry) => - - val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) - entry.buffer.drainTo(spanBatch, entry.bufferCapacity) - - Future { - Try { - entry.reporter.reportSpans(spanBatch.asScala) - }.failed.foreach { error => - logger.error(s"Reporter [${entry.name}] failed to report spans.", error) - } - }(entry.executionContext) - } - } - } - - private def readRegistryConfiguration(config: Config): Configuration = - Configuration( - metricTickInterval = config.getDuration("kamon.metric.tick-interval"), - optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"), - traceTickInterval = config.getDuration("kamon.trace.tick-interval"), - traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), - configuredReporters = config.getStringList("kamon.reporters").asScala - ) - - private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean, - traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String]) - } -} - diff --git a/kamon-core/src/main/scala/kamon/Tracing.scala b/kamon-core/src/main/scala/kamon/Tracing.scala new file mode 100644 index 00000000..30506209 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Tracing.scala @@ -0,0 +1,18 @@ +package kamon + +import kamon.trace.{IdentityProvider, Tracer} + +trait Tracing { self: Configuration with Utilities => + private val _tracer = Tracer.Default(Kamon, config(), clock()) + onReconfigure(newConfig => _tracer.reconfigure(newConfig)) + + def buildSpan(operationName: String): Tracer.SpanBuilder = + _tracer.buildSpan(operationName) + + def identityProvider: IdentityProvider = + _tracer.identityProvider + + protected def tracer(): Tracer.Default = + _tracer + +} diff --git a/kamon-core/src/main/scala/kamon/Utilities.scala b/kamon-core/src/main/scala/kamon/Utilities.scala new file mode 100644 index 00000000..b5c7d2a6 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Utilities.scala @@ -0,0 +1,58 @@ +package kamon + +import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} + +import com.typesafe.config.Config +import kamon.util.{Clock, Filters, Matcher} + +/** + * Base utilities used by other Kamon components. + */ +trait Utilities { self: Configuration => + private val _clock = new Clock.Default() + private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(self.config()), numberedThreadFactory("kamon-scheduler", daemon = false)) + @volatile private var _filters = Filters.fromConfig(self.config()) + + self.onReconfigure(newConfig => { + self._filters = Filters.fromConfig(newConfig) + self._scheduler match { + case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config)) + case _ => // cannot change the pool size on other unknown types. + } + }) + + sys.addShutdownHook(() => _scheduler.shutdown()) + + + /** + * Applies a filter to the given pattern. All filters are configured on the kamon.util.filters configuration section. + * + * @return true if the pattern matches at least one includes pattern and none of the excludes patterns in the filter. + */ + def filter(filterName: String, pattern: String): Boolean = + _filters.accept(filterName, pattern) + + /** + * Retrieves a matcher for the given filter name. All filters are configured on the kamon.util.filters configuration + * section. + */ + def filter(filterName: String): Matcher = + _filters.get(filterName) + + /** + * Kamon's clock implementation. + */ + def clock(): Clock = + _clock + + /** + * Scheduler to be used for Kamon-related tasks like updating gauges. + */ + def scheduler(): ScheduledExecutorService = + _scheduler + + + + private def schedulerPoolSize(config: Config): Int = + config.getInt("kamon.scheduler-pool-size") +} diff --git a/kamon-core/src/main/scala/kamon/module/Module.scala b/kamon-core/src/main/scala/kamon/module/Module.scala new file mode 100644 index 00000000..41649629 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/module/Module.scala @@ -0,0 +1,426 @@ +package kamon +package module + +import java.time.{Duration, Instant} +import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + +import com.typesafe.config.Config +import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot} +import kamon.trace.Tracer.SpanBuffer +import kamon.util.Clock +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise} +import scala.util.Try +import scala.util.control.NonFatal + +/** + * Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace + * data to external services. Additionally, modules can be automatically registered in Kamon by simply being present + * in the classpath and having the appropriate entry in the configuration file. All modules get a dedicated execution + * context which will be used to call the start, stop and reconfigure hooks. + * + * Besides the basic lifecycle hooks, when registering a [[MetricReporter]] and/or [[SpanReporter]] module, Kamon will + * also schedule calls to [[MetricReporter.reportPeriodSnapshot()]] and [[SpanReporter.reportSpans()]] in the module's + * execution context. + */ +trait Module { + + /** + * Signals that a the module has been registered in Kamon's module registry. + */ + def start(): Unit + + /** + * Signals that the module should be stopped and all acquired resources, if any, should be released. + */ + def stop(): Unit + + /** + * Signals that a new configuration object has been provided to Kamon. Modules should ensure that their internal + * settings are in sync with the provided configuration. + */ + def reconfigure(newConfig: Config): Unit +} + + +object Module { + + /** + * Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its + * registration. + */ + trait Registration { + + /** + * Removes and stops the related module. + */ + def cancel(): Unit + } + + /** + * Controls the lifecycle of all available modules. + */ + class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) { + private val _logger = LoggerFactory.getLogger(classOf[Registry]) + private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true)) + private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true)) + + private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + + private var _registrySettings = readRegistryConfiguration(configuration.config()) + private var _registeredModules: Map[String, Entry[Module]] = Map.empty + private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty + private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty + + // Start ticking as soon as the registry is created. + scheduleMetricsTicker() + scheduleSpansTicker() + + + /** + * Registers a module that has already been instantiated by the user. The start callback will be executed as part + * of the registration process. If a module with the specified name already exists the registration will fail. If + * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics + * and spans data upon every tick. + * + * @param name Desired module name. + * @param module Module instance. + * @return A registration that can be used to stop the module at any time. + */ + def register(name: String, module: Module): Registration = synchronized { + if(_registeredModules.get(name).isEmpty) { + val moduleEntry = createEntry(name, true, module) + startModule(moduleEntry) + registration(moduleEntry) + + } else { + _logger.warn(s"Cannot register module [$name], a module with that name already exists.") + noopRegistration(name) + } + } + + /** + * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the + * configured modules state. + */ + def load(config: Config): Unit = synchronized { + val configuredModules = readModuleSettings(config) + val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded } + + // Start, reconfigure and stop modules that are still present but disabled. + configuredModules.foreach { moduleSettings => + automaticallyRegisteredModules.get(moduleSettings.name).fold { + // The module does not exist in the registry, the only possible action is starting it, if enabled. + if(moduleSettings.enabled) { + createModule(moduleSettings).foreach(entry => startModule(entry)) + } + + } { existentModuleSettings => + // When a module already exists it can either need to be stopped, or to be reconfigured. + if(moduleSettings.enabled) { + reconfigureModule(existentModuleSettings, config) + } else { + stopModule(existentModuleSettings) + } + } + } + + // Remove all modules that no longer exist in the configuration. + val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty) + missingModules.foreach { + case (_, entry) => stopModule(entry) + } + } + + /** + * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the + * registry. + */ + def reconfigure(newConfig: Config): Unit = synchronized { + _registrySettings = readRegistryConfiguration(configuration.config()) + _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig)) + scheduleMetricsTicker() + scheduleSpansTicker() + } + + /** + * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and + * spans available until the call to stop. + */ + def stop(): Future[Unit] = synchronized { + implicit val cleanupExecutor = ExecutionContext.Implicits.global + scheduleMetricsTicker(once = true) + scheduleSpansTicker(once = true) + + val stopSignals = _registeredModules.values.map(stopModule) + val latch = new CountDownLatch(stopSignals.size) + stopSignals.foreach(f => f.onComplete(_ => latch.countDown())) + + // There is a global 30 seconds limit to shutdown after which all executors will shut down. + val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS)) + stopCompletionFuture.onComplete(_ => { + _metricsTickerExecutor.shutdown() + _spansTickerExecutor.shutdown() + }) + + stopCompletionFuture.map(_ => ()) + } + + + /** + * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to + * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be + * cancelled and scheduled again. + */ + private def scheduleMetricsTicker(once: Boolean = false): Unit = { + val currentMetricsTicker = _metricsTickerSchedule.get() + if(currentMetricsTicker != null) + currentMetricsTicker.cancel(false) + + _metricsTickerSchedule.set { + val interval = _registrySettings.metricTickInterval.toMillis + val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) { + val now = clock.instant() + val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval) + Duration.between(now, nextTick).toMillis + } else _registrySettings.metricTickInterval.toMillis + + val ticker = new Runnable { + var lastInstant = Instant.now(clock) + + override def run(): Unit = try { + val currentInstant = Instant.now(clock) + val periodSnapshot = PeriodSnapshot( + from = lastInstant, + to = currentInstant, + metrics = snapshotGenerator.snapshot()) + + metricReporterModules().foreach { entry => + Future { + Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error => + _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) + } + }(entry.executionContext) + } + + lastInstant = currentInstant + } catch { + case NonFatal(t) => _logger.error("Failed to run a metrics tick", t) + } + } + + if(once) + _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) + else + _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS) + } + } + + /** + * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to + * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be + * cancelled and scheduled again. + */ + private def scheduleSpansTicker(once: Boolean = false): Unit = { + val currentSpansTicker = _spansTickerSchedule.get() + if(currentSpansTicker != null) + currentSpansTicker.cancel(false) + + _spansTickerSchedule.set { + val interval = _registrySettings.traceTickInterval.toMillis + + val ticker = new Runnable { + override def run(): Unit = try { + val spanBatch = spanBuffer.flush() + + spanReporterModules().foreach { entry => + Future { + Try(entry.module.reportSpans(spanBatch)).failed.foreach { error => + _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error) + } + }(entry.executionContext) + } + + } catch { + case NonFatal(t) => _logger.error("Failed to run a spans tick", t) + } + } + + if(once) + _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) + else + _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS) + } + } + + private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized { + _metricReporterModules.values + } + + private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized { + _spanReporterModules.values + } + + private def readModuleSettings(config: Config): Seq[Settings] = { + val moduleConfigs = config.getConfig("kamon.modules").configurations + val moduleSettings = moduleConfigs.map { + case (moduleName, moduleConfig) => + val moduleSettings = Try { + Settings( + moduleName, + moduleConfig.getString("class"), + moduleConfig.getBoolean("enabled") + ) + } + + moduleSettings.failed.foreach { t => + _logger.warn(s"Failed to read configuration for module [$moduleName]", t) + + if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) { + _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration") + } + } + + moduleSettings + + } filter(_.isSuccess) map(_.get) toSeq + + // Legacy modules from <1.2.0 + val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass => + Settings(moduleClass, moduleClass, true) + } toSeq + + val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty) + repeatedLegacyModules.foreach(m => + _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting.")) + + uniqueLegacyModules.foreach(m => + _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules.")) + + moduleSettings ++ uniqueLegacyModules + } + + /** + * Creates a module from the provided settings. + */ + private def createModule(settings: Settings): Option[Entry[Module]] = { + val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil) + val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance)) + + moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t)) + moduleEntry.toOption + } + + private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance) + } + + + /** + * Registers a module and schedules execution of its start procedure. + */ + private def startModule(entry: Entry[Module]): Unit = { + registerModule(entry) + + // Schedule the start hook on the module + entry.executionContext.execute(new Runnable { + override def run(): Unit = + Try(entry.module.start()) + .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t)) + }) + } + + private def registerModule(entry: Entry[Module]): Unit = { + _registeredModules = _registeredModules + (entry.name -> entry) + if(entry.module.isInstanceOf[MetricReporter]) + _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]]) + if(entry.module.isInstanceOf[SpanReporter]) + _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]]) + + } + + /** + * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution + * context. The returned future completes when the module finishes its stop procedure. + */ + private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized { + val cleanupExecutor = ExecutionContext.Implicits.global + + // Remove the module from all registries + _registeredModules = _registeredModules - entry.name + if(entry.module.isInstanceOf[MetricReporter]) + _metricReporterModules = _metricReporterModules - entry.name + if(entry.module.isInstanceOf[SpanReporter]) + _spanReporterModules = _spanReporterModules - entry.name + + + // Schedule a call to stop on the module + val stopPromise = Promise[Unit]() + entry.executionContext.execute(new Runnable { + override def run(): Unit = + stopPromise.complete { + val stopResult = Try(entry.module.stop()) + stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t)) + stopResult + } + + }) + + stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor) + stopPromise.future + } + + /** + * Schedules a call to reconfigure on the module's execution context. + */ + private def reconfigureModule(entry: Entry[Module], config: Config): Unit = { + entry.executionContext.execute(new Runnable { + override def run(): Unit = + Try(entry.module.reconfigure(config)) + .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t)) + }) + } + + private def noopRegistration(moduleName: String): Registration = new Registration { + override def cancel(): Unit = + _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly") + } + + private def registration(entry: Entry[Module]): Registration = new Registration { + override def cancel(): Unit = stopModule(entry) + } + } + + private def readRegistryConfiguration(config: Config): RegistrySettings = + RegistrySettings( + metricTickInterval = config.getDuration("kamon.metric.tick-interval"), + optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"), + traceTickInterval = config.getDuration("kamon.trace.tick-interval"), + traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size") + ) + + private case class RegistrySettings( + metricTickInterval: Duration, + optimisticMetricTickAlignment: Boolean, + traceTickInterval: Duration, + traceReporterQueueSize: Int + ) + + private case class Settings( + name: String, + fqcn: String, + enabled: Boolean + ) + + private case class Entry[T <: Module]( + name: String, + executionContext: ExecutionContextExecutorService, + programmaticallyAdded: Boolean, + module: T + ) +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/module/ReportingModule.scala b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala new file mode 100644 index 00000000..0e88fc23 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala @@ -0,0 +1,21 @@ +package kamon +package module + +import kamon.trace.Span +import kamon.metric.PeriodSnapshot + +/** + * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The + * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting. + */ +trait MetricReporter extends Module { + def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit +} + +/** + * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the + * span batches is controlled by the kamon.trace.tick-interval setting. + */ +trait SpanReporter extends Module { + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit +} diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 6015e350..ee301e8a 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -18,7 +18,6 @@ package trace import java.time.Instant -import kamon.ReporterRegistry.SpanSink import kamon.context.Context import kamon.metric.MeasurementUnit import kamon.trace.SpanContext.SamplingDecision @@ -90,7 +89,7 @@ object Span { final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span { + initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span { private var collectMetrics: Boolean = trackMetrics private var open: Boolean = true @@ -203,7 +202,7 @@ object Span { recordSpanMetrics(to) if(sampled) - spanSink.reportSpan(toFinishedSpan(to)) + spanBuffer.append(toFinishedSpan(to)) } } @@ -229,9 +228,9 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, + initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanSink, trackMetrics, scopeSpanMetrics, clock) + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanBuffer, trackMetrics, scopeSpanMetrics, clock) } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index 4cad1eb7..542638cf 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -38,7 +38,7 @@ object SpanPropagation { import B3.Headers override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { - val identityProvider = Kamon.tracer.identityProvider + val identityProvider = Kamon.identityProvider val traceID = reader.read(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) @@ -122,7 +122,7 @@ object SpanPropagation { override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { reader.read(Header.B3).map { header => - val identityProvider = Kamon.tracer.identityProvider + val identityProvider = Kamon.identityProvider val (traceID, spanID, samplingDecision, parentSpanID) = header.splitToTuple("-") @@ -220,7 +220,7 @@ object SpanPropagation { if(medium.available() == 0) context else { - val identityProvider = Kamon.tracer.identityProvider + val identityProvider = Kamon.identityProvider val colferSpan = new ColferSpan() colferSpan.unmarshal(medium.readAll(), 0) diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index ad7ffbed..7a314205 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -18,13 +18,13 @@ package kamon.trace import java.time.Instant import com.typesafe.config.Config -import kamon.ReporterRegistry.SpanSink import kamon.Kamon import kamon.metric.MetricLookup -import kamon.trace.Span.TagValue +import kamon.trace.Span.{FinishedSpan, TagValue} import kamon.trace.SpanContext.SamplingDecision import kamon.trace.Tracer.SpanBuilder import kamon.util.{Clock, DynamicAccess} +import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue} import org.slf4j.LoggerFactory import scala.collection.immutable @@ -37,19 +37,26 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer { - private val logger = LoggerFactory.getLogger(classOf[Tracer]) + private[kamon] trait SpanBuffer { + def append(span: FinishedSpan): Unit + def flush(): Seq[FinishedSpan] + } + + final class Default(metrics: MetricLookup, initialConfig: Config, clock: Clock) extends Tracer with SpanBuffer { + private val _logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) - @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true - @volatile private[Tracer] var scopeSpanMetrics: Boolean = true + @volatile private[Tracer] var _traceReporterQueueSize = 1024 + @volatile private[Tracer] var _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](_traceReporterQueueSize) + @volatile private[Tracer] var _joinRemoteParentsWithSameSpanID: Boolean = true + @volatile private[Tracer] var _scopeSpanMetrics: Boolean = true @volatile private[Tracer] var _sampler: Sampler = Sampler.Never @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default() reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, spanSink, clock) + new SpanBuilder(operationName, this, this, clock) override def identityProvider: IdentityProvider = this._identityProvider @@ -69,29 +76,54 @@ object Tracer { case other => sys.error(s"Unexpected sampler name $other.") } + val newTraceReporterQueueSize = traceConfig.getInt("reporter-queue-size") val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id") val newScopeSpanMetrics = traceConfig.getBoolean("span-metrics.scope-spans-to-parent") val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider]( traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)] ).get + if(_traceReporterQueueSize != newTraceReporterQueueSize) { + // By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters. + // Since reconfigures are very unlikely to happen beyond application startup this might be a problem at all. + // If we eventually decide to keep those possible Spans around then we will need to change the queue type to + // multiple consumer as the reconfiguring thread will need to drain the contents before replacing. + _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](newTraceReporterQueueSize) + } + _sampler = newSampler - joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID - scopeSpanMetrics = newScopeSpanMetrics + _joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID + _scopeSpanMetrics = newScopeSpanMetrics _identityProvider = newIdentityProvider + _traceReporterQueueSize = newTraceReporterQueueSize }.failed.foreach { - ex => logger.error("Unable to reconfigure Kamon Tracer", ex) + ex => _logger.error("Unable to reconfigure Kamon Tracer", ex) } } + + + override def append(span: FinishedSpan): Unit = + _spanBuffer.offer(span) + + override def flush(): Seq[FinishedSpan] = { + var spans = Seq.empty[FinishedSpan] + _spanBuffer.drain(new MessagePassingQueue.Consumer[Span.FinishedSpan] { + override def accept(span: FinishedSpan): Unit = + spans = span +: spans + }) + + spans + } + } object Default { - def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default = - new Default(metrics, spanSink, initialConfig, clock) + def apply(metrics: MetricLookup, initialConfig: Config, clock: Clock): Default = + new Default(metrics, initialConfig, clock) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanBuffer: Tracer.SpanBuffer, clock: Clock) { private var parentSpan: Span = _ private var initialOperationName: String = operationName private var from: Instant = Instant.EPOCH @@ -192,15 +224,15 @@ object Tracer { initialSpanTags, initialMetricTags, spanFrom, - spanSink, + spanBuffer, trackMetrics, - tracer.scopeSpanMetrics, + tracer._scopeSpanMetrics, clock ) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = - if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID) + if(parent.isRemote() && tracer._joinRemoteParentsWithSameSpanID) parent.context().copy(samplingDecision = samplingDecision) else parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) -- cgit v1.2.3