diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2019-03-18 13:44:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-18 13:44:50 +0100 |
commit | 8efb3b408a876a3dfdac79580773279125cb4135 (patch) | |
tree | 96fd746fc13f4ffb914d8e59c4ea074513877556 /kamon-core/src/main/scala/kamon/module/Module.scala | |
parent | e311df4b3a272f4f160857f718a96ec316a2fc06 (diff) | |
parent | ec83a72879378bc9eedea24f828e4d30fed95e92 (diff) | |
download | Kamon-8efb3b408a876a3dfdac79580773279125cb4135.tar.gz Kamon-8efb3b408a876a3dfdac79580773279125cb4135.tar.bz2 Kamon-8efb3b408a876a3dfdac79580773279125cb4135.zip |
Merge pull request #569 from ivantopo/status-page
Status page
Diffstat (limited to 'kamon-core/src/main/scala/kamon/module/Module.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/module/Module.scala | 416 |
1 files changed, 45 insertions, 371 deletions
diff --git a/kamon-core/src/main/scala/kamon/module/Module.scala b/kamon-core/src/main/scala/kamon/module/Module.scala index 41649629..592e02aa 100644 --- a/kamon-core/src/main/scala/kamon/module/Module.scala +++ b/kamon-core/src/main/scala/kamon/module/Module.scala @@ -1,20 +1,9 @@ package kamon package module -import java.time.{Duration, Instant} -import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit} -import java.util.concurrent.atomic.AtomicReference - import com.typesafe.config.Config -import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot} -import kamon.trace.Tracer.SpanBuffer -import kamon.util.Clock -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters.collectionAsScalaIterableConverter -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise} -import scala.util.Try -import scala.util.control.NonFatal +import kamon.metric.PeriodSnapshot +import kamon.trace.Span /** * Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace @@ -46,8 +35,39 @@ trait Module { } +/** + * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The + * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting. + */ +trait MetricReporter extends Module { + def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit +} + +/** + * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the + * span batches is controlled by the kamon.trace.tick-interval setting. + */ +trait SpanReporter extends Module { + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit +} + +/** + * Modules implementing this trait will get registered for periodically receiving metric period snapshots and span + * batches. + */ +trait CombinedReporter extends MetricReporter with SpanReporter + + object Module { + sealed trait Kind + object Kind { + case object Combined extends Kind + case object Metric extends Kind + case object Span extends Kind + case object Plain extends Kind + } + /** * Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its * registration. @@ -61,366 +81,20 @@ object Module { } /** - * Controls the lifecycle of all available modules. + * Configuration of a given module present in the classpath. + * + * @param name Module's name + * @param description Module's description. + * @param clazz The class implementing the configured module. + * @param kind Module kind. + * @param enabled Whether the module is enabled or not. Enabled modules in the classpath will be automatically + * started in any call to Kamon.loadModules(). */ - class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) { - private val _logger = LoggerFactory.getLogger(classOf[Registry]) - private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true)) - private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true)) - - private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - - private var _registrySettings = readRegistryConfiguration(configuration.config()) - private var _registeredModules: Map[String, Entry[Module]] = Map.empty - private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty - private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty - - // Start ticking as soon as the registry is created. - scheduleMetricsTicker() - scheduleSpansTicker() - - - /** - * Registers a module that has already been instantiated by the user. The start callback will be executed as part - * of the registration process. If a module with the specified name already exists the registration will fail. If - * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics - * and spans data upon every tick. - * - * @param name Desired module name. - * @param module Module instance. - * @return A registration that can be used to stop the module at any time. - */ - def register(name: String, module: Module): Registration = synchronized { - if(_registeredModules.get(name).isEmpty) { - val moduleEntry = createEntry(name, true, module) - startModule(moduleEntry) - registration(moduleEntry) - - } else { - _logger.warn(s"Cannot register module [$name], a module with that name already exists.") - noopRegistration(name) - } - } - - /** - * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the - * configured modules state. - */ - def load(config: Config): Unit = synchronized { - val configuredModules = readModuleSettings(config) - val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded } - - // Start, reconfigure and stop modules that are still present but disabled. - configuredModules.foreach { moduleSettings => - automaticallyRegisteredModules.get(moduleSettings.name).fold { - // The module does not exist in the registry, the only possible action is starting it, if enabled. - if(moduleSettings.enabled) { - createModule(moduleSettings).foreach(entry => startModule(entry)) - } - - } { existentModuleSettings => - // When a module already exists it can either need to be stopped, or to be reconfigured. - if(moduleSettings.enabled) { - reconfigureModule(existentModuleSettings, config) - } else { - stopModule(existentModuleSettings) - } - } - } - - // Remove all modules that no longer exist in the configuration. - val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty) - missingModules.foreach { - case (_, entry) => stopModule(entry) - } - } - - /** - * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the - * registry. - */ - def reconfigure(newConfig: Config): Unit = synchronized { - _registrySettings = readRegistryConfiguration(configuration.config()) - _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig)) - scheduleMetricsTicker() - scheduleSpansTicker() - } - - /** - * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and - * spans available until the call to stop. - */ - def stop(): Future[Unit] = synchronized { - implicit val cleanupExecutor = ExecutionContext.Implicits.global - scheduleMetricsTicker(once = true) - scheduleSpansTicker(once = true) - - val stopSignals = _registeredModules.values.map(stopModule) - val latch = new CountDownLatch(stopSignals.size) - stopSignals.foreach(f => f.onComplete(_ => latch.countDown())) - - // There is a global 30 seconds limit to shutdown after which all executors will shut down. - val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS)) - stopCompletionFuture.onComplete(_ => { - _metricsTickerExecutor.shutdown() - _spansTickerExecutor.shutdown() - }) - - stopCompletionFuture.map(_ => ()) - } - - - /** - * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to - * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be - * cancelled and scheduled again. - */ - private def scheduleMetricsTicker(once: Boolean = false): Unit = { - val currentMetricsTicker = _metricsTickerSchedule.get() - if(currentMetricsTicker != null) - currentMetricsTicker.cancel(false) - - _metricsTickerSchedule.set { - val interval = _registrySettings.metricTickInterval.toMillis - val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) { - val now = clock.instant() - val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval) - Duration.between(now, nextTick).toMillis - } else _registrySettings.metricTickInterval.toMillis - - val ticker = new Runnable { - var lastInstant = Instant.now(clock) - - override def run(): Unit = try { - val currentInstant = Instant.now(clock) - val periodSnapshot = PeriodSnapshot( - from = lastInstant, - to = currentInstant, - metrics = snapshotGenerator.snapshot()) - - metricReporterModules().foreach { entry => - Future { - Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error => - _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) - } - }(entry.executionContext) - } - - lastInstant = currentInstant - } catch { - case NonFatal(t) => _logger.error("Failed to run a metrics tick", t) - } - } - - if(once) - _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) - else - _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS) - } - } - - /** - * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to - * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be - * cancelled and scheduled again. - */ - private def scheduleSpansTicker(once: Boolean = false): Unit = { - val currentSpansTicker = _spansTickerSchedule.get() - if(currentSpansTicker != null) - currentSpansTicker.cancel(false) - - _spansTickerSchedule.set { - val interval = _registrySettings.traceTickInterval.toMillis - - val ticker = new Runnable { - override def run(): Unit = try { - val spanBatch = spanBuffer.flush() - - spanReporterModules().foreach { entry => - Future { - Try(entry.module.reportSpans(spanBatch)).failed.foreach { error => - _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error) - } - }(entry.executionContext) - } - - } catch { - case NonFatal(t) => _logger.error("Failed to run a spans tick", t) - } - } - - if(once) - _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS) - else - _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS) - } - } - - private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized { - _metricReporterModules.values - } - - private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized { - _spanReporterModules.values - } - - private def readModuleSettings(config: Config): Seq[Settings] = { - val moduleConfigs = config.getConfig("kamon.modules").configurations - val moduleSettings = moduleConfigs.map { - case (moduleName, moduleConfig) => - val moduleSettings = Try { - Settings( - moduleName, - moduleConfig.getString("class"), - moduleConfig.getBoolean("enabled") - ) - } - - moduleSettings.failed.foreach { t => - _logger.warn(s"Failed to read configuration for module [$moduleName]", t) - - if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) { - _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration") - } - } - - moduleSettings - - } filter(_.isSuccess) map(_.get) toSeq - - // Legacy modules from <1.2.0 - val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass => - Settings(moduleClass, moduleClass, true) - } toSeq - - val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty) - repeatedLegacyModules.foreach(m => - _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting.")) - - uniqueLegacyModules.foreach(m => - _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules.")) - - moduleSettings ++ uniqueLegacyModules - } - - /** - * Creates a module from the provided settings. - */ - private def createModule(settings: Settings): Option[Entry[Module]] = { - val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil) - val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance)) - - moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t)) - moduleEntry.toOption - } - - private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance) - } - - - /** - * Registers a module and schedules execution of its start procedure. - */ - private def startModule(entry: Entry[Module]): Unit = { - registerModule(entry) - - // Schedule the start hook on the module - entry.executionContext.execute(new Runnable { - override def run(): Unit = - Try(entry.module.start()) - .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t)) - }) - } - - private def registerModule(entry: Entry[Module]): Unit = { - _registeredModules = _registeredModules + (entry.name -> entry) - if(entry.module.isInstanceOf[MetricReporter]) - _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]]) - if(entry.module.isInstanceOf[SpanReporter]) - _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]]) - - } - - /** - * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution - * context. The returned future completes when the module finishes its stop procedure. - */ - private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized { - val cleanupExecutor = ExecutionContext.Implicits.global - - // Remove the module from all registries - _registeredModules = _registeredModules - entry.name - if(entry.module.isInstanceOf[MetricReporter]) - _metricReporterModules = _metricReporterModules - entry.name - if(entry.module.isInstanceOf[SpanReporter]) - _spanReporterModules = _spanReporterModules - entry.name - - - // Schedule a call to stop on the module - val stopPromise = Promise[Unit]() - entry.executionContext.execute(new Runnable { - override def run(): Unit = - stopPromise.complete { - val stopResult = Try(entry.module.stop()) - stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t)) - stopResult - } - - }) - - stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor) - stopPromise.future - } - - /** - * Schedules a call to reconfigure on the module's execution context. - */ - private def reconfigureModule(entry: Entry[Module], config: Config): Unit = { - entry.executionContext.execute(new Runnable { - override def run(): Unit = - Try(entry.module.reconfigure(config)) - .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t)) - }) - } - - private def noopRegistration(moduleName: String): Registration = new Registration { - override def cancel(): Unit = - _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly") - } - - private def registration(entry: Entry[Module]): Registration = new Registration { - override def cancel(): Unit = stopModule(entry) - } - } - - private def readRegistryConfiguration(config: Config): RegistrySettings = - RegistrySettings( - metricTickInterval = config.getDuration("kamon.metric.tick-interval"), - optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"), - traceTickInterval = config.getDuration("kamon.trace.tick-interval"), - traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size") - ) - - private case class RegistrySettings( - metricTickInterval: Duration, - optimisticMetricTickAlignment: Boolean, - traceTickInterval: Duration, - traceReporterQueueSize: Int - ) - - private case class Settings( + case class Settings( name: String, - fqcn: String, + description: String, + clazz: Class[_ <: Module], + kind: Module.Kind, enabled: Boolean ) - - private case class Entry[T <: Module]( - name: String, - executionContext: ExecutionContextExecutorService, - programmaticallyAdded: Boolean, - module: T - ) }
\ No newline at end of file |