From 0a2b7f4bf0dde31c82482fbaf5153c22c84ada69 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 4 Feb 2019 19:15:43 +0100 Subject: cleanup the exposition of status data and ensure the module registry provides enough info --- build.sbt | 2 +- .../src/test/scala/kamon/KamonLifecycleSpec.scala | 12 +- .../test/scala/kamon/ReporterRegistrySpec.scala | 129 ------ .../scala/kamon/module/ModuleRegistrySpec.scala | 129 ++++++ kamon-core/src/main/resources/reference.conf | 2 + kamon-core/src/main/scala/kamon/ClassLoading.scala | 6 + kamon-core/src/main/scala/kamon/Kamon.scala | 40 +- .../src/main/scala/kamon/ModuleLoading.scala | 5 +- .../main/scala/kamon/metric/MetricRegistry.scala | 14 +- .../src/main/scala/kamon/module/Module.scala | 465 ++------------------- .../main/scala/kamon/module/ModuleRegistry.scala | 463 ++++++++++++++++++++ .../main/scala/kamon/module/ReportingModule.scala | 27 -- .../main/scala/kamon/status/JsonMarshalling.scala | 29 +- .../src/main/scala/kamon/status/Status.scala | 36 +- .../main/scala/kamon/status/StatusPageServer.scala | 2 +- 15 files changed, 743 insertions(+), 618 deletions(-) delete mode 100644 kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala create mode 100644 kamon-core-tests/src/test/scala/kamon/module/ModuleRegistrySpec.scala create mode 100644 kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala delete mode 100644 kamon-core/src/main/scala/kamon/module/ReportingModule.scala diff --git a/build.sbt b/build.sbt index 27114730..b5b06ee5 100644 --- a/build.sbt +++ b/build.sbt @@ -77,7 +77,7 @@ lazy val coreTests = (project in file("kamon-core-tests")) .settings( libraryDependencies ++= Seq( "org.scalatest" %% "scalatest" % "3.0.1" % "test", - "ch.qos.logback" % "logback-classic" % "1.2.2" % "test" + "ch.qos.logback" % "logback-classic" % "1.2.3" % "test" ) ).dependsOn(testkit) diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala index 2a0af8c0..9ee07694 100644 --- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala @@ -39,14 +39,14 @@ class KamonLifecycleSpec extends WordSpec with Matchers with Eventually { } } -class DummyMetricReporter extends MetricReporter { +class DummyMetricReporter extends kamon.module.MetricReporter { override def start(): Unit = {} override def stop(): Unit = {} override def reconfigure(config: Config): Unit = {} override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {} } -class DummySpanReporter extends SpanReporter { +class DummySpanReporter extends kamon.module.SpanReporter { override def start(): Unit = {} override def stop(): Unit = {} override def reconfigure(config: Config): Unit = {} @@ -54,13 +54,13 @@ class DummySpanReporter extends SpanReporter { } object KamonWithRunningReporter extends App { - Kamon.addReporter(new DummyMetricReporter()) - Kamon.addReporter(new DummySpanReporter()) + Kamon.registerModule("dummy metric reporter", new DummyMetricReporter()) + Kamon.registerModule("dummy span reporter", new DummySpanReporter()) } object KamonWithTemporaryReporter extends App { - Kamon.addReporter(new DummyMetricReporter()) - Kamon.addReporter(new DummySpanReporter()) + Kamon.registerModule("dummy metric reporter", new DummyMetricReporter()) + Kamon.registerModule("dummy span repoter", new DummySpanReporter()) Thread.sleep(5000) Kamon.stopAllReporters() diff --git a/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala b/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala deleted file mode 100644 index 515dfdd1..00000000 --- a/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon - -import com.typesafe.config.Config -import kamon.metric.PeriodSnapshot -import kamon.testkit.Reconfigure -import org.scalatest.concurrent.Eventually -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} - -class ReporterRegistrySpec extends WordSpec with Matchers with Reconfigure with Eventually with BeforeAndAfterAll { - "The ReporterRegistry" when { - "working with metrics reporters" should { - "report all metrics if no filters are applied" in { - Kamon.counter("test.hello").increment() - Kamon.counter("test.world").increment() - Kamon.counter("other.hello").increment() - - val reporter = new SeenMetricsReporter() - val subscription = Kamon.addReporter(reporter, "reporter-registry-spec") - - eventually { - reporter.snapshotCount() should be >= 1 - reporter.metrics() should contain allOf( - "test.hello", - "test.world", - "other.hello" - ) - } - - subscription.cancel() - } - - "default to deny all metrics if a provided filter name doesn't exist" in { - Kamon.counter("test.hello").increment() - Kamon.counter("test.world").increment() - Kamon.counter("other.hello").increment() - - val reporter = new SeenMetricsReporter() - val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "does-not-exist") - - eventually { - reporter.snapshotCount() should be >= 1 - reporter.metrics() shouldBe empty - } - - subscription.cancel() - } - - "apply existent filters" in { - Kamon.counter("test.hello").increment() - Kamon.counter("test.world").increment() - Kamon.counter("other.hello").increment() - - val reporter = new SeenMetricsReporter() - val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "real-filter") - - eventually { - reporter.snapshotCount() should be >= 1 - reporter.metrics() should contain allOf( - "test.hello", - "test.world" - ) - } - - subscription.cancel() - } - } - } - - - override protected def beforeAll(): Unit = { - applyConfig( - """ - |kamon { - | metric.tick-interval = 10 millis - | - | util.filters { - | real-filter { - | includes = [ "test**" ] - | } - | } - |} - | - """.stripMargin - ) - } - - - override protected def afterAll(): Unit = { - resetConfig() - } - - abstract class DummyReporter extends MetricReporter { - override def start(): Unit = {} - override def stop(): Unit = {} - override def reconfigure(config: Config): Unit = {} - } - - class SeenMetricsReporter extends DummyReporter { - @volatile private var count = 0 - @volatile private var seenMetrics = Seq.empty[String] - - override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { - import snapshot.metrics._ - count += 1 - seenMetrics = counters.map(_.name) ++ histograms.map(_.name) ++ gauges.map(_.name) ++ rangeSamplers.map(_.name) - } - - def metrics(): Seq[String] = - seenMetrics - - def snapshotCount(): Int = - count - } -} diff --git a/kamon-core-tests/src/test/scala/kamon/module/ModuleRegistrySpec.scala b/kamon-core-tests/src/test/scala/kamon/module/ModuleRegistrySpec.scala new file mode 100644 index 00000000..791015fb --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/module/ModuleRegistrySpec.scala @@ -0,0 +1,129 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package module + +import com.typesafe.config.Config +import kamon.metric.PeriodSnapshot +import kamon.testkit.Reconfigure +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import kamon.{MetricReporter => LegacyMetricReporter} + +class ModuleRegistrySpec extends WordSpec with Matchers with Reconfigure with Eventually with BeforeAndAfterAll { + "The ModuleRegistry" when { + "working with metrics reporters" should { + "report all metrics if no filters are applied" in { + Kamon.counter("test.hello").increment() + Kamon.counter("test.world").increment() + Kamon.counter("other.hello").increment() + + val reporter = new SeenMetricsReporter() + val subscription = Kamon.addReporter(reporter, "reporter-registry-spec") + + eventually { + reporter.snapshotCount() should be >= 1 + reporter.metrics() should contain allOf( + "test.hello", + "test.world", + "other.hello" + ) + } + + subscription.cancel() + } + + "default to deny all metrics if a provided filter name doesn't exist" in { + Kamon.counter("test.hello").increment() + Kamon.counter("test.world").increment() + Kamon.counter("other.hello").increment() + + val reporter = new SeenMetricsReporter() + val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "does-not-exist") + + eventually { + reporter.snapshotCount() should be >= 1 + reporter.metrics() shouldBe empty + } + + subscription.cancel() + } + + "apply existent filters" in { + Kamon.counter("test.hello").increment() + Kamon.counter("test.world").increment() + Kamon.counter("other.hello").increment() + + val reporter = new SeenMetricsReporter() + val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "real-filter") + + eventually { + reporter.snapshotCount() should be >= 1 + reporter.metrics() should contain allOf( + "test.hello", + "test.world" + ) + } + + subscription.cancel() + } + } + } + + + override protected def beforeAll(): Unit = { + applyConfig( + """ + |kamon { + | metric.tick-interval = 10 millis + | + | util.filters { + | real-filter { + | includes = [ "test**" ] + | } + | } + |} + | + """.stripMargin + ) + } + + + override protected def afterAll(): Unit = { + resetConfig() + } + + class SeenMetricsReporter extends LegacyMetricReporter { + @volatile private var count = 0 + @volatile private var seenMetrics = Seq.empty[String] + + override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { + import snapshot.metrics._ + count += 1 + seenMetrics = counters.map(_.name) ++ histograms.map(_.name) ++ gauges.map(_.name) ++ rangeSamplers.map(_.name) + } + + def metrics(): Seq[String] = + seenMetrics + + def snapshotCount(): Int = + count + + override def start(): Unit = {} + override def stop(): Unit = {} + override def reconfigure(config: Config): Unit = {} + } +} diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 1d6a9f1b..dea7296d 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -44,6 +44,8 @@ kamon { # kamon.modules { # module-name { # enabled = true + # description = "A module description" + # kind = "combined | metric | span | plain" # class = "com.example.ModuleClass" # } # } diff --git a/kamon-core/src/main/scala/kamon/ClassLoading.scala b/kamon-core/src/main/scala/kamon/ClassLoading.scala index 5b097af1..54d1922c 100644 --- a/kamon-core/src/main/scala/kamon/ClassLoading.scala +++ b/kamon-core/src/main/scala/kamon/ClassLoading.scala @@ -23,4 +23,10 @@ trait ClassLoading { def createInstance[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] = _dynamicAccess.createInstanceFor(fqcn, args) + + def createInstance[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] = + _dynamicAccess.createInstanceFor(clazz, args) + + def resolveClass[T: ClassTag](fqcn: String): Try[Class[_ <: T]] = + _dynamicAccess.getClassFor(fqcn) } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 96aba81c..50c3e69d 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -15,9 +15,9 @@ package kamon -import com.typesafe.config.{Config, ConfigRenderOptions} +import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions} import kamon.metric.PeriodSnapshot -import kamon.module.{MetricReporter, Module} +import kamon.trace.Span object Kamon extends ClassLoading with Configuration @@ -42,16 +42,41 @@ object Kamon extends ClassLoading object QuickTest extends App { + val manualConfig = + """ + |kamon.modules { + | kamon-zipkin { + | enabled = false + | description = "Module that sends data to particular places" + | kind = metric + | class = kamon.MyCustomMetricDude + | } + |} + """.stripMargin + + val newConfig = ConfigFactory.parseString(manualConfig).withFallback(Kamon.config()) + Kamon.reconfigure(newConfig) + + + Kamon.loadModules() - Kamon.registerModule("my-module", new MetricReporter { + Kamon.registerModule("my-module", new kamon.module.MetricReporter { override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {} override def start(): Unit = {} override def stop(): Unit = {} override def reconfigure(newConfig: Config): Unit = {} }) + Kamon.registerModule("my-module-for-spans", new kamon.module.SpanReporter { + override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = {} + override def start(): Unit = {} + override def stop(): Unit = {} + override def reconfigure(newConfig: Config): Unit = {} + }) + - Kamon.histogram("test").refine("tagcito" -> "value").record(10) + Kamon.histogram("test").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").record(10) + Kamon.rangeSampler("test-rs").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").increment(34) Kamon.counter("test-counter").refine("tagcito" -> "value").increment(42) //println("JSON CONFIG: " + Kamon.config().root().render(ConfigRenderOptions.concise().setFormatted(true).setJson(true))) @@ -61,3 +86,10 @@ object QuickTest extends App { } + +class MyCustomMetricDude extends kamon.module.MetricReporter { + override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {} + override def start(): Unit = {} + override def stop(): Unit = {} + override def reconfigure(newConfig: Config): Unit = {} +} diff --git a/kamon-core/src/main/scala/kamon/ModuleLoading.scala b/kamon-core/src/main/scala/kamon/ModuleLoading.scala index bc90c656..9501f277 100644 --- a/kamon-core/src/main/scala/kamon/ModuleLoading.scala +++ b/kamon-core/src/main/scala/kamon/ModuleLoading.scala @@ -3,6 +3,7 @@ package kamon import com.typesafe.config.Config import kamon.metric.{MetricsSnapshot, PeriodSnapshot} import kamon.module.Module +import kamon.module.ModuleRegistry import kamon.util.Registration import kamon.module.{MetricReporter => NewMetricReporter} import kamon.module.{SpanReporter => NewSpanReporter} @@ -30,13 +31,15 @@ trait SpanReporter extends kamon.module.SpanReporter { } * kamon.modules { * module-name { * enabled = true + * description = "A module description" + * kind = "combined | metric | span | plain" * class = "com.example.MyModule" * } * } * */ trait ModuleLoading { self: ClassLoading with Configuration with Utilities with Metrics with Tracing => - protected val _moduleRegistry = new Module.Registry(self, self, clock(), self.metricRegistry(), self.tracer()) + protected val _moduleRegistry = new ModuleRegistry(self, self, clock(), self.metricRegistry(), self.tracer()) self.onReconfigure(newConfig => self._moduleRegistry.reconfigure(newConfig)) diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala index 11c5653f..fd0cd8b8 100644 --- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala @@ -26,6 +26,7 @@ import scala.collection.concurrent.TrieMap import java.time.Duration import java.util.concurrent.ScheduledExecutorService +import kamon.status.Status import org.slf4j.LoggerFactory @@ -89,24 +90,19 @@ class MetricRegistry(initialConfig: Config, scheduler: ScheduledExecutorService) metric.asInstanceOf[T] } - private[kamon] def status(): MetricRegistry.Status = { - var metricInfos = Seq.empty[MetricRegistry.MetricInfo] + private[kamon] def status(): Status.MetricRegistry = { + var registeredMetrics = Seq.empty[Status.Metric] metrics.foreach { case (metricName, metric) => metric.incarnations().foreach(incarnation => { - metricInfos = metricInfos :+ MetricRegistry.MetricInfo(metricName, incarnation, metric.instrumentType) + registeredMetrics = registeredMetrics :+ Status.Metric(metricName, incarnation, metric.instrumentType) }) } - MetricRegistry.Status(metricInfos) + Status.MetricRegistry(registeredMetrics) } } -object MetricRegistry { - case class Status(metrics: Seq[MetricInfo]) - case class MetricInfo(name: String, tags: Map[String, String], instrumentType: InstrumentType) -} - trait MetricsSnapshotGenerator { def snapshot(): MetricsSnapshot } diff --git a/kamon-core/src/main/scala/kamon/module/Module.scala b/kamon-core/src/main/scala/kamon/module/Module.scala index fd5af16c..592e02aa 100644 --- a/kamon-core/src/main/scala/kamon/module/Module.scala +++ b/kamon-core/src/main/scala/kamon/module/Module.scala @@ -1,20 +1,9 @@ package kamon package module -import java.time.{Duration, Instant} -import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit} -import java.util.concurrent.atomic.AtomicReference - import com.typesafe.config.Config -import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot} -import kamon.trace.Tracer.SpanBuffer -import kamon.util.Clock -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters.collectionAsScalaIterableConverter -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise} -import scala.util.Try -import scala.util.control.NonFatal +import kamon.metric.PeriodSnapshot +import kamon.trace.Span /** * Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace @@ -46,8 +35,39 @@ trait Module { } +/** + * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The + * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting. + */ +trait MetricReporter extends Module { + def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit +} + +/** + * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the + * span batches is controlled by the kamon.trace.tick-interval setting. + */ +trait SpanReporter extends Module { + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit +} + +/** + * Modules implementing this trait will get registered for periodically receiving metric period snapshots and span + * batches. + */ +trait CombinedReporter extends MetricReporter with SpanReporter + + object Module { + sealed trait Kind + object Kind { + case object Combined extends Kind + case object Metric extends Kind + case object Span extends Kind + case object Plain extends Kind + } + /** * Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its * registration. @@ -61,415 +81,20 @@ object Module { } /** - * Controls the lifecycle of all available modules. + * Configuration of a given module present in the classpath. + * + * @param name Module's name + * @param description Module's description. + * @param clazz The class implementing the configured module. + * @param kind Module kind. + * @param enabled Whether the module is enabled or not. Enabled modules in the classpath will be automatically + * started in any call to Kamon.loadModules(). */ - class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) { - private val _logger = LoggerFactory.getLogger(classOf[Registry]) - private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true)) - private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true)) - - private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - - private var _registrySettings = readRegistryConfiguration(configuration.config()) - private var _registeredModules: Map[String, Entry[Module]] = Map.empty - private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty - private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty - - // Start ticking as soon as the registry is created. - scheduleMetricsTicker() - scheduleSpansTicker() - - - /** - * Registers a module that has already been instantiated by the user. The start callback will be executed as part - * of the registration process. If a module with the specified name already exists the registration will fail. If - * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics - * and spans data upon every tick. - * - * @param name Desired module name. - * @param module Module instance. - * @return A registration that can be used to stop the module at any time. - */ - def register(name: String, module: Module): Registration = synchronized { - if(_registeredModules.get(name).isEmpty) { - val moduleEntry = createEntry(name, true, module) - startModule(moduleEntry) - registration(moduleEntry) - - } else { - _logger.warn(s"Cannot register module [$name], a module with that name already exists.") - noopRegistration(name) - } - } - - /** - * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the - * configured modules state. - */ - def load(config: Config): Unit = synchronized { - val configuredModules = readModuleSettings(config) - val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded } - - // Start, reconfigure and stop modules that are still present but disabled. - configuredModules.foreach { moduleSettings => - automaticallyRegisteredModules.get(moduleSettings.name).fold { - // The module does not exist in the registry, the only possible action is starting it, if enabled. - if(moduleSettings.enabled) { - createModule(moduleSettings).foreach(entry => startModule(entry)) - } - - } { existentModuleSettings => - // When a module already exists it can either need to be stopped, or to be reconfigured. - if(moduleSettings.enabled) { - reconfigureModule(existentModuleSettings, config) - } else { - stopModule(existentModuleSettings) - } - } - } - - // Remove all modules that no longer exist in the configuration. - val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty) - missingModules.foreach { - case (_, entry) => stopModule(entry) - } - } - - /** - * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the - * registry. - */ - def reconfigure(newConfig: Config): Unit = synchronized { - _registrySettings = readRegistryConfiguration(configuration.config()) - _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig)) - scheduleMetricsTicker() - scheduleSpansTicker() - } - - /** - * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and - * spans available until the call to stop. - */ - def stop(): Future[Unit] = synchronized { - implicit val cleanupExecutor = ExecutionContext.Implicits.global - scheduleMetricsTicker(once = true) - scheduleSpansTicker(once = true) - - val stopSignals = _registeredModules.values.map(stopModule) - val latch = new CountDownLatch(stopSignals.size) - stopSignals.foreach(f => f.onComplete(_ => latch.countDown())) - - // There is a global 30 seconds limit to shutdown after which all executors will shut down. - val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS)) - stopCompletionFuture.onComplete(_ => { - _metricsTickerExecutor.shutdown() - _spansTickerExecutor.shutdown() - }) - - stopCompletionFuture.map(_ => ()) - } - - - /** - * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to - * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be - * cancelled and scheduled again. - */ - private def scheduleMetricsTicker(once: Boolean = false): Unit = { - val currentMetricsTicker = _metricsTickerSchedule.get() - if(currentMetricsTicker != null) - currentMetricsTicker.cancel(false) - - _metricsTickerSchedule.set { - val interval = _registrySettings.metricTickInterval.toMillis - val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) { - val now = clock.instant() - val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval) - Duration.between(now, nextTick).toMillis - } else _registrySettings.metricTickInterval.toMillis - - val ticker = new Runnable { - var lastInstant = Instant.now(clock) - - override def run(): Unit = try { - val currentInstant = Instant.now(clock) - val periodSnapshot = PeriodSnapshot( - from = lastInstant, - to = currentInstant, - metrics = snapshotGenerator.snapshot()) - - metricReporterModules().foreach { entry => - Future { - Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error => - _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) - } - }(entry.executionContext) - } - - lastInstant = currentInstant - } catch { - case NonFatal(t) => _logger.error("Failed to run a metrics tick", t) - } - } - - if(once) - _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) - else - _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS) - } - } - - /** - * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to - * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be - * cancelled and scheduled again. - */ - private def scheduleSpansTicker(once: Boolean = false): Unit = { - val currentSpansTicker = _spansTickerSchedule.get() - if(currentSpansTicker != null) - currentSpansTicker.cancel(false) - - _spansTickerSchedule.set { - val interval = _registrySettings.traceTickInterval.toMillis - - val ticker = new Runnable { - override def run(): Unit = try { - val spanBatch = spanBuffer.flush() - - spanReporterModules().foreach { entry => - Future { - Try(entry.module.reportSpans(spanBatch)).failed.foreach { error => - _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error) - } - }(entry.executionContext) - } - - } catch { - case NonFatal(t) => _logger.error("Failed to run a spans tick", t) - } - } - - if(once) - _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) - else - _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS) - } - } - - private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized { - _metricReporterModules.values - } - - private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized { - _spanReporterModules.values - } - - private def readModuleSettings(config: Config): Seq[Settings] = { - val moduleConfigs = config.getConfig("kamon.modules").configurations - val moduleSettings = moduleConfigs.map { - case (moduleName, moduleConfig) => - val moduleSettings = Try { - Settings( - moduleName, - moduleConfig.getString("class"), - moduleConfig.getBoolean("enabled") - ) - } - - moduleSettings.failed.foreach { t => - _logger.warn(s"Failed to read configuration for module [$moduleName]", t) - - if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) { - _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration") - } - } - - moduleSettings - - } filter(_.isSuccess) map(_.get) toSeq - - - // Load all modules that might have been configured using the legacy "kamon.reporters" setting from <1.2.0 - // versions. This little hack should be removed by the time we release 2.0. - // - if(config.hasPath("kamon.reporters")) { - val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass => - Settings(moduleClass, moduleClass, true) - } toSeq - - val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty) - repeatedLegacyModules.foreach(m => - _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting.")) - - uniqueLegacyModules.foreach(m => - _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules.")) - - moduleSettings ++ uniqueLegacyModules - - } else moduleSettings - } - - /** - * Creates a module from the provided settings. - */ - private def createModule(settings: Settings): Option[Entry[Module]] = { - val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil) - val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance)) - - moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t)) - moduleEntry.toOption - } - - private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance) - } - - - /** - * Returns the current status of this module registry. - */ - private[kamon] def status(): Registry.Status = { - def moduleKind(instance: Any): String = instance match { - case _: CombinedReporter => "combined" - case _: MetricReporter => "metric" - case _: SpanReporter => "span" - case _: Module => "plain" - } - - val automaticallyAddedModules = readModuleSettings(configuration.config()).map(moduleSettings => { - val entry = _registeredModules.get(moduleSettings.name) - val entryModuleKind = entry.map(e => moduleKind(e.module)).getOrElse("unknown") - Registry.ModuleInfo(moduleSettings.name, moduleSettings.fqcn, moduleSettings.enabled, entry.nonEmpty, entryModuleKind) - }) - - val programmaticallyAddedModules = _registeredModules - .filter { case (_, entry) => entry.programmaticallyAdded } - .map { case (name, entry) => { - Registry.ModuleInfo(name, entry.module.getClass.getName, true, true, moduleKind(entry.module)) - }} - - val allModules = automaticallyAddedModules ++ programmaticallyAddedModules - Registry.Status(allModules) - } - - - /** - * Registers a module and schedules execution of its start procedure. - */ - private def startModule(entry: Entry[Module]): Unit = { - registerModule(entry) - - // Schedule the start hook on the module - entry.executionContext.execute(new Runnable { - override def run(): Unit = - Try(entry.module.start()) - .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t)) - }) - } - - private def registerModule(entry: Entry[Module]): Unit = { - _registeredModules = _registeredModules + (entry.name -> entry) - if(entry.module.isInstanceOf[MetricReporter]) - _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]]) - if(entry.module.isInstanceOf[SpanReporter]) - _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]]) - - } - - /** - * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution - * context. The returned future completes when the module finishes its stop procedure. - */ - private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized { - val cleanupExecutor = ExecutionContext.Implicits.global - - // Remove the module from all registries - _registeredModules = _registeredModules - entry.name - if(entry.module.isInstanceOf[MetricReporter]) - _metricReporterModules = _metricReporterModules - entry.name - if(entry.module.isInstanceOf[SpanReporter]) - _spanReporterModules = _spanReporterModules - entry.name - - - // Schedule a call to stop on the module - val stopPromise = Promise[Unit]() - entry.executionContext.execute(new Runnable { - override def run(): Unit = - stopPromise.complete { - val stopResult = Try(entry.module.stop()) - stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t)) - stopResult - } - - }) - - stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor) - stopPromise.future - } - - /** - * Schedules a call to reconfigure on the module's execution context. - */ - private def reconfigureModule(entry: Entry[Module], config: Config): Unit = { - entry.executionContext.execute(new Runnable { - override def run(): Unit = - Try(entry.module.reconfigure(config)) - .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t)) - }) - } - - private def noopRegistration(moduleName: String): Registration = new Registration { - override def cancel(): Unit = - _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly") - } - - private def registration(entry: Entry[Module]): Registration = new Registration { - override def cancel(): Unit = stopModule(entry) - } - } - - object Registry { - - case class Status( - modules: Seq[ModuleInfo] - ) - - case class ModuleInfo( - name: String, - description: String, - enabled: Boolean, - started: Boolean, - kind: String - ) - } - - private def readRegistryConfiguration(config: Config): RegistrySettings = - RegistrySettings( - metricTickInterval = config.getDuration("kamon.metric.tick-interval"), - optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"), - traceTickInterval = config.getDuration("kamon.trace.tick-interval"), - traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size") - ) - - private case class RegistrySettings( - metricTickInterval: Duration, - optimisticMetricTickAlignment: Boolean, - traceTickInterval: Duration, - traceReporterQueueSize: Int - ) - - private case class Settings( + case class Settings( name: String, - fqcn: String, + description: String, + clazz: Class[_ <: Module], + kind: Module.Kind, enabled: Boolean ) - - private case class Entry[T <: Module]( - name: String, - executionContext: ExecutionContextExecutorService, - programmaticallyAdded: Boolean, - module: T - ) } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala new file mode 100644 index 00000000..5520c602 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala @@ -0,0 +1,463 @@ +package kamon +package module + +import java.time.{Duration, Instant} +import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicReference + +import com.typesafe.config.Config +import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot} +import kamon.module.Module.Registration +import kamon.status.Status +import kamon.trace.Tracer.SpanBuffer +import kamon.util.Clock +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters.asScalaBufferConverter +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise} +import scala.util.Try +import scala.util.control.NonFatal + + + +/** + * Controls the lifecycle of all available modules. + */ +class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) { + private val _logger = LoggerFactory.getLogger(classOf[ModuleRegistry]) + private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true)) + private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true)) + + private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + + private var _registrySettings = readRegistrySettings(configuration.config()) + private var _registeredModules: Map[String, Entry[Module]] = Map.empty + private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty + private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty + + // Start ticking as soon as the registry is created. + scheduleMetricsTicker() + scheduleSpansTicker() + + + /** + * Registers a module that has already been instantiated by the user. The start callback will be executed as part + * of the registration process. If a module with the specified name already exists the registration will fail. If + * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics + * and spans data upon every tick. + * + * @param name Desired module name. + * @param module Module instance. + * @return A registration that can be used to stop the module at any time. + */ + def register(name: String, module: Module): Registration = synchronized { + if(_registeredModules.get(name).isEmpty) { + val inferredSettings = Module.Settings( + name, + module.getClass.getName, + module.getClass, + inferModuleKind(module.getClass), + true + ) + + val moduleEntry = createEntry(inferredSettings, true, module) + startModule(moduleEntry) + registration(moduleEntry) + + } else { + _logger.warn(s"Cannot register module [$name], a module with that name already exists.") + noopRegistration(name) + } + } + + /** + * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the + * configured modules state. + */ + def load(config: Config): Unit = synchronized { + val configuredModules = readModuleSettings(config) + val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded } + + // Start, reconfigure and stop modules that are still present but disabled. + configuredModules.foreach { moduleSettings => + automaticallyRegisteredModules.get(moduleSettings.name).fold { + // The module does not exist in the registry, the only possible action is starting it, if enabled. + if(moduleSettings.enabled) { + createModule(moduleSettings).foreach(entry => startModule(entry)) + } + + } { existentModuleSettings => + // When a module already exists it can either need to be stopped, or to be reconfigured. + if(moduleSettings.enabled) { + reconfigureModule(existentModuleSettings, config) + } else { + stopModule(existentModuleSettings) + } + } + } + + // Remove all modules that no longer exist in the configuration. + val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty) + missingModules.foreach { + case (_, entry) => stopModule(entry) + } + } + + /** + * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the + * registry. + */ + def reconfigure(newConfig: Config): Unit = synchronized { + _registrySettings = readRegistrySettings(configuration.config()) + _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig)) + scheduleMetricsTicker() + scheduleSpansTicker() + } + + /** + * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and + * spans available until the call to stop. + */ + def stop(): Future[Unit] = synchronized { + implicit val cleanupExecutor = ExecutionContext.Implicits.global + scheduleMetricsTicker(once = true) + scheduleSpansTicker(once = true) + + val stopSignals = _registeredModules.values.map(stopModule) + val latch = new CountDownLatch(stopSignals.size) + stopSignals.foreach(f => f.onComplete(_ => latch.countDown())) + + // There is a global 30 seconds limit to shutdown after which all executors will shut down. + val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS)) + stopCompletionFuture.onComplete(_ => { + _metricsTickerExecutor.shutdown() + _spansTickerExecutor.shutdown() + }) + + stopCompletionFuture.map(_ => ()) + } + + + /** + * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to + * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be + * cancelled and scheduled again. + */ + private def scheduleMetricsTicker(once: Boolean = false): Unit = { + val currentMetricsTicker = _metricsTickerSchedule.get() + if(currentMetricsTicker != null) + currentMetricsTicker.cancel(false) + + _metricsTickerSchedule.set { + val interval = _registrySettings.metricTickInterval.toMillis + val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) { + val now = clock.instant() + val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval) + Duration.between(now, nextTick).toMillis + } else _registrySettings.metricTickInterval.toMillis + + val ticker = new Runnable { + var lastInstant = Instant.now(clock) + + override def run(): Unit = try { + val currentInstant = Instant.now(clock) + val periodSnapshot = PeriodSnapshot( + from = lastInstant, + to = currentInstant, + metrics = snapshotGenerator.snapshot()) + + metricReporterModules().foreach { entry => + Future { + Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error => + _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) + } + }(entry.executionContext) + } + + lastInstant = currentInstant + } catch { + case NonFatal(t) => _logger.error("Failed to run a metrics tick", t) + } + } + + if(once) + _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) + else + _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS) + } + } + + /** + * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to + * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be + * cancelled and scheduled again. + */ + private def scheduleSpansTicker(once: Boolean = false): Unit = { + val currentSpansTicker = _spansTickerSchedule.get() + if(currentSpansTicker != null) + currentSpansTicker.cancel(false) + + _spansTickerSchedule.set { + val interval = _registrySettings.traceTickInterval.toMillis + + val ticker = new Runnable { + override def run(): Unit = try { + val spanBatch = spanBuffer.flush() + + spanReporterModules().foreach { entry => + Future { + Try(entry.module.reportSpans(spanBatch)).failed.foreach { error => + _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error) + } + }(entry.executionContext) + } + + } catch { + case NonFatal(t) => _logger.error("Failed to run a spans tick", t) + } + } + + if(once) + _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) + else + _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS) + } + } + + private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized { + _metricReporterModules.values + } + + private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized { + _spanReporterModules.values + } + + private def readModuleSettings(config: Config): Seq[Module.Settings] = { + val moduleConfigs = config.getConfig("kamon.modules").configurations + val moduleSettings = moduleConfigs.map { + case (moduleName, moduleConfig) => + val moduleSettings = Try { + Module.Settings( + moduleName, + moduleConfig.getString("description"), + classLoading.resolveClass[Module](moduleConfig.getString("class")).get, + parseModuleKind(moduleConfig.getString("kind")), + moduleConfig.getBoolean("enabled") + ) + }.map(ms => { + val inferredModuleKind = inferModuleKind(ms.clazz) + assert(inferredModuleKind == ms.kind, + s"Module [${ms.name}] is configured as [${ms.kind}] but the actual type does not comply to the expected interface.") + ms + }) + + + moduleSettings.failed.foreach { t => + _logger.warn(s"Failed to read configuration for module [$moduleName]", t) + + if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) { + _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration") + } + } + + moduleSettings + + } filter(_.isSuccess) map(_.get) toSeq + + + // Load all modules that might have been configured using the legacy "kamon.reporters" setting from <1.2.0 + // versions. This little hack should be removed by the time we release 2.0. + // + if(config.hasPath("kamon.reporters")) { + val legacyModuleSettings = config.getStringList("kamon.reporters").asScala + .map(moduleClass => { + val moduleSettings = Try { + val moduleClazz = classLoading.resolveClass[Module](moduleClass).get + val inferredModuleKind = inferModuleKind(moduleClazz) + val name = moduleClazz.getName() + val description = "Module detected from the legacy kamon.reporters configuration." + + Module.Settings(name, description, moduleClazz, inferredModuleKind, true) + } + + moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t)) + moduleSettings + }) + .filter(_.isSuccess) + .map(_.get) + + + val (repeatedLegacyModules, uniqueLegacyModules) = legacyModuleSettings + .partition(lm => moduleSettings.find(_.clazz.getName == lm.clazz.getName).nonEmpty) + + repeatedLegacyModules.foreach(m => + _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting.")) + + uniqueLegacyModules.foreach(m => + _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules.")) + + moduleSettings ++ uniqueLegacyModules + + } else moduleSettings + } + + /** + * Creates a module from the provided settings. + */ + private def createModule(settings: Module.Settings): Option[Entry[Module]] = { + val moduleInstance = classLoading.createInstance[Module](settings.clazz, Nil) + val moduleEntry = moduleInstance.map(instance => createEntry(settings, false, instance)) + + moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t)) + moduleEntry.toOption + } + + private def createEntry(settings: Module.Settings, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = { + val executor = Executors.newSingleThreadExecutor(threadFactory(settings.name)) + Entry(settings.name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, settings, instance) + } + + private def inferModuleKind(clazz: Class[_ <: Module]): Module.Kind = { + if(classOf[CombinedReporter].isAssignableFrom(clazz)) + Module.Kind.Combined + else if(classOf[MetricReporter].isAssignableFrom(clazz)) + Module.Kind.Metric + else if(classOf[SpanReporter].isAssignableFrom(clazz)) + Module.Kind.Span + else + Module.Kind.Plain + } + + + /** + * Returns the current status of this module registry. + */ + private[kamon] def status(): Status.ModuleRegistry = { + val automaticallyAddedModules = readModuleSettings(configuration.config()).map(moduleSettings => { + _registeredModules.get(moduleSettings.name) + .map(moduleEntry => + // The module is on the classpath and started. + Status.Module(moduleEntry.name, moduleEntry.settings.description, moduleEntry.settings.kind, false, true) + + ).getOrElse( + // The module is on the classpath but has not been started. + Status.Module(moduleSettings.name, moduleSettings.description, moduleSettings.kind, false, false) + ) + }) + + val programmaticallyAddedModules = _registeredModules + .filter { case (_, entry) => entry.programmaticallyAdded } + .map { case (name, entry) => Status.Module(name, entry.settings.description, entry.settings.kind, true, true) } + + val allModules = automaticallyAddedModules ++ programmaticallyAddedModules + Status.ModuleRegistry(allModules) + } + + + /** + * Registers a module and schedules execution of its start procedure. + */ + private def startModule(entry: Entry[Module]): Unit = { + registerModule(entry) + + // Schedule the start hook on the module + entry.executionContext.execute(new Runnable { + override def run(): Unit = + Try(entry.module.start()) + .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t)) + }) + } + + private def registerModule(entry: Entry[Module]): Unit = { + _registeredModules = _registeredModules + (entry.name -> entry) + if(entry.module.isInstanceOf[MetricReporter]) + _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]]) + if(entry.module.isInstanceOf[SpanReporter]) + _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]]) + + } + + /** + * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution + * context. The returned future completes when the module finishes its stop procedure. + */ + private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized { + val cleanupExecutor = ExecutionContext.Implicits.global + + // Remove the module from all registries + _registeredModules = _registeredModules - entry.name + if(entry.module.isInstanceOf[MetricReporter]) + _metricReporterModules = _metricReporterModules - entry.name + if(entry.module.isInstanceOf[SpanReporter]) + _spanReporterModules = _spanReporterModules - entry.name + + + // Schedule a call to stop on the module + val stopPromise = Promise[Unit]() + entry.executionContext.execute(new Runnable { + override def run(): Unit = + stopPromise.complete { + val stopResult = Try(entry.module.stop()) + stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t)) + stopResult + } + + }) + + stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor) + stopPromise.future + } + + /** + * Schedules a call to reconfigure on the module's execution context. + */ + private def reconfigureModule(entry: Entry[Module], config: Config): Unit = { + entry.executionContext.execute(new Runnable { + override def run(): Unit = + Try(entry.module.reconfigure(config)) + .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t)) + }) + } + + private def noopRegistration(moduleName: String): Registration = new Registration { + override def cancel(): Unit = + _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly") + } + + private def registration(entry: Entry[Module]): Registration = new Registration { + override def cancel(): Unit = stopModule(entry) + } + + private def parseModuleKind(kind: String): Module.Kind = kind.toLowerCase match { + case "combined" => Module.Kind.Combined + case "metric" => Module.Kind.Metric + case "span" => Module.Kind.Span + case "plain" => Module.Kind.Plain + } + + private def readRegistrySettings(config: Config): Settings = + Settings( + metricTickInterval = config.getDuration("kamon.metric.tick-interval"), + optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"), + traceTickInterval = config.getDuration("kamon.trace.tick-interval"), + traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size") + ) + + private case class Settings( + metricTickInterval: Duration, + optimisticMetricTickAlignment: Boolean, + traceTickInterval: Duration, + traceReporterQueueSize: Int + ) + + + private case class Entry[T <: Module]( + name: String, + executionContext: ExecutionContextExecutorService, + programmaticallyAdded: Boolean, + settings: Module.Settings, + module: T + ) +} + diff --git a/kamon-core/src/main/scala/kamon/module/ReportingModule.scala b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala deleted file mode 100644 index 02a7344e..00000000 --- a/kamon-core/src/main/scala/kamon/module/ReportingModule.scala +++ /dev/null @@ -1,27 +0,0 @@ -package kamon -package module - -import kamon.trace.Span -import kamon.metric.PeriodSnapshot - -/** - * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The - * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting. - */ -trait MetricReporter extends Module { - def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit -} - -/** - * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the - * span batches is controlled by the kamon.trace.tick-interval setting. - */ -trait SpanReporter extends Module { - def reportSpans(spans: Seq[Span.FinishedSpan]): Unit -} - -/** - * Modules implementing this trait will get registered for periodically receiving metric period snapshots and span - * batches. - */ -trait CombinedReporter extends MetricReporter with SpanReporter diff --git a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala index 345e5cad..5a3f22dc 100644 --- a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala +++ b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala @@ -1,12 +1,10 @@ package kamon.status import com.grack.nanojson.JsonWriter -import kamon.module.Module -import kamon.module.Module.Registry import java.lang.{StringBuilder => JavaStringBuilder} import com.typesafe.config.ConfigRenderOptions -import kamon.metric.MetricRegistry +import kamon.module.Module trait JsonMarshalling[T] { @@ -20,8 +18,15 @@ trait JsonMarshalling[T] { object JsonMarshalling { - implicit object ModuleRegistryStatusJsonMarshalling extends JsonMarshalling[Module.Registry.Status] { - override def toJson(instance: Registry.Status, builder: JavaStringBuilder): Unit = { + implicit object ModuleRegistryStatusJsonMarshalling extends JsonMarshalling[Status.ModuleRegistry] { + override def toJson(instance: Status.ModuleRegistry, builder: JavaStringBuilder): Unit = { + def moduleKindString(moduleKind: Module.Kind): String = moduleKind match { + case Module.Kind.Combined => "combined" + case Module.Kind.Metric => "metric" + case Module.Kind.Span => "span" + case Module.Kind.Plain => "plain" + } + val array = JsonWriter.on(builder) .`object`() .array("modules") @@ -30,9 +35,9 @@ object JsonMarshalling { array.`object`() .value("name", m.name) .value("description", m.description) - .value("kind", m.kind) - .value("enabled", m.enabled) - .value("started", m.started) + .value("kind", moduleKindString(m.kind)) + .value("isProgrammaticallyRegistered", m.isProgrammaticallyRegistered) + .value("isStarted", m.isStarted) .end() }) @@ -40,8 +45,8 @@ object JsonMarshalling { } } - implicit object BaseInfoJsonMarshalling extends JsonMarshalling[Status.BaseInfo] { - override def toJson(instance: Status.BaseInfo, builder: JavaStringBuilder): Unit = { + implicit object BaseInfoJsonMarshalling extends JsonMarshalling[Status.Settings] { + override def toJson(instance: Status.Settings, builder: JavaStringBuilder): Unit = { val baseConfigJson = JsonWriter.on(builder) .`object`() .value("version", instance.version) @@ -65,8 +70,8 @@ object JsonMarshalling { } } - implicit object MetricRegistryStatusJsonMarshalling extends JsonMarshalling[MetricRegistry.Status] { - override def toJson(instance: MetricRegistry.Status, builder: JavaStringBuilder): Unit = { + implicit object MetricRegistryStatusJsonMarshalling extends JsonMarshalling[Status.MetricRegistry] { + override def toJson(instance: Status.MetricRegistry, builder: JavaStringBuilder): Unit = { val metricsObject = JsonWriter.on(builder) .`object` .array("metrics") diff --git a/kamon-core/src/main/scala/kamon/status/Status.scala b/kamon-core/src/main/scala/kamon/status/Status.scala index 0d141ed0..dc059277 100644 --- a/kamon-core/src/main/scala/kamon/status/Status.scala +++ b/kamon-core/src/main/scala/kamon/status/Status.scala @@ -1,26 +1,27 @@ package kamon.status import com.typesafe.config.Config +import kamon.metric.InstrumentFactory.InstrumentType import kamon.metric.MetricRegistry import kamon.{Configuration, Environment, Kamon} -import kamon.module.Module - +import kamon.module.ModuleRegistry +import kamon.module.Module.{Kind => ModuleKind} /** * Allows accessing of component's status APIs without exposing any other internal API from those components. */ -class Status(_moduleRegistry: Module.Registry, _metricRegistry: MetricRegistry, configuration: Configuration) { +class Status(_moduleRegistry: ModuleRegistry, _metricRegistry: MetricRegistry, configuration: Configuration) { - def baseInfo(): Status.BaseInfo = - Status.BaseInfo(BuildInfo.version, Kamon.environment, configuration.config()) + def settings(): Status.Settings = + Status.Settings(BuildInfo.version, Kamon.environment, configuration.config()) /** * Information about what modules have been detected in the classpath and their current status. */ - def moduleRegistry(): Module.Registry.Status = + def moduleRegistry(): Status.ModuleRegistry = _moduleRegistry.status() - def metricRegistry(): MetricRegistry.Status = + def metricRegistry(): Status.MetricRegistry = _metricRegistry.status() } @@ -28,13 +29,32 @@ class Status(_moduleRegistry: Module.Registry, _metricRegistry: MetricRegistry, object Status { - case class BaseInfo( + case class Settings( version: String, environment: Environment, config: Config ) + case class ModuleRegistry( + modules: Seq[Module] + ) + + case class Module( + name: String, + description: String, + kind: ModuleKind, + isProgrammaticallyRegistered: Boolean, + isStarted: Boolean + ) + case class MetricRegistry( + metrics: Seq[Metric] + ) + case class Metric( + name: String, + tags: Map[String, String], + instrumentType: InstrumentType + ) } diff --git a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala index 7a124c1b..35273f39 100644 --- a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala +++ b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala @@ -17,7 +17,7 @@ class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader, // Serve the current status data on Json. session.getUri() match { - case "/status/config" => json(status.baseInfo()) + case "/status/settings" => json(status.settings()) case "/status/modules" => json(status.moduleRegistry()) case "/status/metrics" => json(status.metricRegistry()) case _ => NotFound -- cgit v1.2.3