aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/module/Module.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/module/Module.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/module/Module.scala416
1 files changed, 45 insertions, 371 deletions
diff --git a/kamon-core/src/main/scala/kamon/module/Module.scala b/kamon-core/src/main/scala/kamon/module/Module.scala
index 41649629..592e02aa 100644
--- a/kamon-core/src/main/scala/kamon/module/Module.scala
+++ b/kamon-core/src/main/scala/kamon/module/Module.scala
@@ -1,20 +1,9 @@
package kamon
package module
-import java.time.{Duration, Instant}
-import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
-import java.util.concurrent.atomic.AtomicReference
-
import com.typesafe.config.Config
-import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
-import kamon.trace.Tracer.SpanBuffer
-import kamon.util.Clock
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters.collectionAsScalaIterableConverter
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
-import scala.util.Try
-import scala.util.control.NonFatal
+import kamon.metric.PeriodSnapshot
+import kamon.trace.Span
/**
* Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace
@@ -46,8 +35,39 @@ trait Module {
}
+/**
+ * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The
+ * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting.
+ */
+trait MetricReporter extends Module {
+ def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
+}
+
+/**
+ * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the
+ * span batches is controlled by the kamon.trace.tick-interval setting.
+ */
+trait SpanReporter extends Module {
+ def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
+}
+
+/**
+ * Modules implementing this trait will get registered for periodically receiving metric period snapshots and span
+ * batches.
+ */
+trait CombinedReporter extends MetricReporter with SpanReporter
+
+
object Module {
+ sealed trait Kind
+ object Kind {
+ case object Combined extends Kind
+ case object Metric extends Kind
+ case object Span extends Kind
+ case object Plain extends Kind
+ }
+
/**
* Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its
* registration.
@@ -61,366 +81,20 @@ object Module {
}
/**
- * Controls the lifecycle of all available modules.
+ * Configuration of a given module present in the classpath.
+ *
+ * @param name Module's name
+ * @param description Module's description.
+ * @param clazz The class implementing the configured module.
+ * @param kind Module kind.
+ * @param enabled Whether the module is enabled or not. Enabled modules in the classpath will be automatically
+ * started in any call to Kamon.loadModules().
*/
- class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
- private val _logger = LoggerFactory.getLogger(classOf[Registry])
- private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
- private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
-
- private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
- private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
-
- private var _registrySettings = readRegistryConfiguration(configuration.config())
- private var _registeredModules: Map[String, Entry[Module]] = Map.empty
- private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
- private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty
-
- // Start ticking as soon as the registry is created.
- scheduleMetricsTicker()
- scheduleSpansTicker()
-
-
- /**
- * Registers a module that has already been instantiated by the user. The start callback will be executed as part
- * of the registration process. If a module with the specified name already exists the registration will fail. If
- * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
- * and spans data upon every tick.
- *
- * @param name Desired module name.
- * @param module Module instance.
- * @return A registration that can be used to stop the module at any time.
- */
- def register(name: String, module: Module): Registration = synchronized {
- if(_registeredModules.get(name).isEmpty) {
- val moduleEntry = createEntry(name, true, module)
- startModule(moduleEntry)
- registration(moduleEntry)
-
- } else {
- _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
- noopRegistration(name)
- }
- }
-
- /**
- * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
- * configured modules state.
- */
- def load(config: Config): Unit = synchronized {
- val configuredModules = readModuleSettings(config)
- val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }
-
- // Start, reconfigure and stop modules that are still present but disabled.
- configuredModules.foreach { moduleSettings =>
- automaticallyRegisteredModules.get(moduleSettings.name).fold {
- // The module does not exist in the registry, the only possible action is starting it, if enabled.
- if(moduleSettings.enabled) {
- createModule(moduleSettings).foreach(entry => startModule(entry))
- }
-
- } { existentModuleSettings =>
- // When a module already exists it can either need to be stopped, or to be reconfigured.
- if(moduleSettings.enabled) {
- reconfigureModule(existentModuleSettings, config)
- } else {
- stopModule(existentModuleSettings)
- }
- }
- }
-
- // Remove all modules that no longer exist in the configuration.
- val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
- missingModules.foreach {
- case (_, entry) => stopModule(entry)
- }
- }
-
- /**
- * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
- * registry.
- */
- def reconfigure(newConfig: Config): Unit = synchronized {
- _registrySettings = readRegistryConfiguration(configuration.config())
- _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
- scheduleMetricsTicker()
- scheduleSpansTicker()
- }
-
- /**
- * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
- * spans available until the call to stop.
- */
- def stop(): Future[Unit] = synchronized {
- implicit val cleanupExecutor = ExecutionContext.Implicits.global
- scheduleMetricsTicker(once = true)
- scheduleSpansTicker(once = true)
-
- val stopSignals = _registeredModules.values.map(stopModule)
- val latch = new CountDownLatch(stopSignals.size)
- stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))
-
- // There is a global 30 seconds limit to shutdown after which all executors will shut down.
- val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
- stopCompletionFuture.onComplete(_ => {
- _metricsTickerExecutor.shutdown()
- _spansTickerExecutor.shutdown()
- })
-
- stopCompletionFuture.map(_ => ())
- }
-
-
- /**
- * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
- * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
- * cancelled and scheduled again.
- */
- private def scheduleMetricsTicker(once: Boolean = false): Unit = {
- val currentMetricsTicker = _metricsTickerSchedule.get()
- if(currentMetricsTicker != null)
- currentMetricsTicker.cancel(false)
-
- _metricsTickerSchedule.set {
- val interval = _registrySettings.metricTickInterval.toMillis
- val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
- val now = clock.instant()
- val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
- Duration.between(now, nextTick).toMillis
- } else _registrySettings.metricTickInterval.toMillis
-
- val ticker = new Runnable {
- var lastInstant = Instant.now(clock)
-
- override def run(): Unit = try {
- val currentInstant = Instant.now(clock)
- val periodSnapshot = PeriodSnapshot(
- from = lastInstant,
- to = currentInstant,
- metrics = snapshotGenerator.snapshot())
-
- metricReporterModules().foreach { entry =>
- Future {
- Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
- _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
- }
- }(entry.executionContext)
- }
-
- lastInstant = currentInstant
- } catch {
- case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
- }
- }
-
- if(once)
- _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
- else
- _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
- }
- }
-
- /**
- * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
- * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
- * cancelled and scheduled again.
- */
- private def scheduleSpansTicker(once: Boolean = false): Unit = {
- val currentSpansTicker = _spansTickerSchedule.get()
- if(currentSpansTicker != null)
- currentSpansTicker.cancel(false)
-
- _spansTickerSchedule.set {
- val interval = _registrySettings.traceTickInterval.toMillis
-
- val ticker = new Runnable {
- override def run(): Unit = try {
- val spanBatch = spanBuffer.flush()
-
- spanReporterModules().foreach { entry =>
- Future {
- Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
- _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
- }
- }(entry.executionContext)
- }
-
- } catch {
- case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
- }
- }
-
- if(once)
- _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
- else
- _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
- }
- }
-
- private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
- _metricReporterModules.values
- }
-
- private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
- _spanReporterModules.values
- }
-
- private def readModuleSettings(config: Config): Seq[Settings] = {
- val moduleConfigs = config.getConfig("kamon.modules").configurations
- val moduleSettings = moduleConfigs.map {
- case (moduleName, moduleConfig) =>
- val moduleSettings = Try {
- Settings(
- moduleName,
- moduleConfig.getString("class"),
- moduleConfig.getBoolean("enabled")
- )
- }
-
- moduleSettings.failed.foreach { t =>
- _logger.warn(s"Failed to read configuration for module [$moduleName]", t)
-
- if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
- _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
- }
- }
-
- moduleSettings
-
- } filter(_.isSuccess) map(_.get) toSeq
-
- // Legacy modules from <1.2.0
- val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass =>
- Settings(moduleClass, moduleClass, true)
- } toSeq
-
- val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty)
- repeatedLegacyModules.foreach(m =>
- _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
-
- uniqueLegacyModules.foreach(m =>
- _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
-
- moduleSettings ++ uniqueLegacyModules
- }
-
- /**
- * Creates a module from the provided settings.
- */
- private def createModule(settings: Settings): Option[Entry[Module]] = {
- val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil)
- val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance))
-
- moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
- moduleEntry.toOption
- }
-
- private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance)
- }
-
-
- /**
- * Registers a module and schedules execution of its start procedure.
- */
- private def startModule(entry: Entry[Module]): Unit = {
- registerModule(entry)
-
- // Schedule the start hook on the module
- entry.executionContext.execute(new Runnable {
- override def run(): Unit =
- Try(entry.module.start())
- .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
- })
- }
-
- private def registerModule(entry: Entry[Module]): Unit = {
- _registeredModules = _registeredModules + (entry.name -> entry)
- if(entry.module.isInstanceOf[MetricReporter])
- _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
- if(entry.module.isInstanceOf[SpanReporter])
- _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])
-
- }
-
- /**
- * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
- * context. The returned future completes when the module finishes its stop procedure.
- */
- private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
- val cleanupExecutor = ExecutionContext.Implicits.global
-
- // Remove the module from all registries
- _registeredModules = _registeredModules - entry.name
- if(entry.module.isInstanceOf[MetricReporter])
- _metricReporterModules = _metricReporterModules - entry.name
- if(entry.module.isInstanceOf[SpanReporter])
- _spanReporterModules = _spanReporterModules - entry.name
-
-
- // Schedule a call to stop on the module
- val stopPromise = Promise[Unit]()
- entry.executionContext.execute(new Runnable {
- override def run(): Unit =
- stopPromise.complete {
- val stopResult = Try(entry.module.stop())
- stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
- stopResult
- }
-
- })
-
- stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
- stopPromise.future
- }
-
- /**
- * Schedules a call to reconfigure on the module's execution context.
- */
- private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
- entry.executionContext.execute(new Runnable {
- override def run(): Unit =
- Try(entry.module.reconfigure(config))
- .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
- })
- }
-
- private def noopRegistration(moduleName: String): Registration = new Registration {
- override def cancel(): Unit =
- _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
- }
-
- private def registration(entry: Entry[Module]): Registration = new Registration {
- override def cancel(): Unit = stopModule(entry)
- }
- }
-
- private def readRegistryConfiguration(config: Config): RegistrySettings =
- RegistrySettings(
- metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
- optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
- traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
- traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
- )
-
- private case class RegistrySettings(
- metricTickInterval: Duration,
- optimisticMetricTickAlignment: Boolean,
- traceTickInterval: Duration,
- traceReporterQueueSize: Int
- )
-
- private case class Settings(
+ case class Settings(
name: String,
- fqcn: String,
+ description: String,
+ clazz: Class[_ <: Module],
+ kind: Module.Kind,
enabled: Boolean
)
-
- private case class Entry[T <: Module](
- name: String,
- executionContext: ExecutionContextExecutorService,
- programmaticallyAdded: Boolean,
- module: T
- )
} \ No newline at end of file