package kamon
package module
import java.time.{Duration, Instant}
import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.Config
import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
import kamon.trace.Tracer.SpanBuffer
import kamon.util.Clock
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal
/**
* Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace
* data to external services. Additionally, modules can be automatically registered in Kamon by simply being present
* in the classpath and having the appropriate entry in the configuration file. All modules get a dedicated execution
* context which will be used to call the start, stop and reconfigure hooks.
*
* Besides the basic lifecycle hooks, when registering a [[MetricReporter]] and/or [[SpanReporter]] module, Kamon will
* also schedule calls to [[MetricReporter.reportPeriodSnapshot()]] and [[SpanReporter.reportSpans()]] in the module's
* execution context.
*/
trait Module {
/**
* Signals that a the module has been registered in Kamon's module registry.
*/
def start(): Unit
/**
* Signals that the module should be stopped and all acquired resources, if any, should be released.
*/
def stop(): Unit
/**
* Signals that a new configuration object has been provided to Kamon. Modules should ensure that their internal
* settings are in sync with the provided configuration.
*/
def reconfigure(newConfig: Config): Unit
}
object Module {
/**
* Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its
* registration.
*/
trait Registration {
/**
* Removes and stops the related module.
*/
def cancel(): Unit
}
/**
* Controls the lifecycle of all available modules.
*/
class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
private val _logger = LoggerFactory.getLogger(classOf[Registry])
private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
private var _registrySettings = readRegistryConfiguration(configuration.config())
private var _registeredModules: Map[String, Entry[Module]] = Map.empty
private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty
// Start ticking as soon as the registry is created.
scheduleMetricsTicker()
scheduleSpansTicker()
/**
* Registers a module that has already been instantiated by the user. The start callback will be executed as part
* of the registration process. If a module with the specified name already exists the registration will fail. If
* the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
* and spans data upon every tick.
*
* @param name Desired module name.
* @param module Module instance.
* @return A registration that can be used to stop the module at any time.
*/
def register(name: String, module: Module): Registration = synchronized {
if(_registeredModules.get(name).isEmpty) {
val moduleEntry = createEntry(name, true, module)
startModule(moduleEntry)
registration(moduleEntry)
} else {
_logger.warn(s"Cannot register module [$name], a module with that name already exists.")
noopRegistration(name)
}
}
/**
* Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
* configured modules state.
*/
def load(config: Config): Unit = synchronized {
val configuredModules = readModuleSettings(config)
val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }
// Start, reconfigure and stop modules that are still present but disabled.
configuredModules.foreach { moduleSettings =>
automaticallyRegisteredModules.get(moduleSettings.name).fold {
// The module does not exist in the registry, the only possible action is starting it, if enabled.
if(moduleSettings.enabled) {
createModule(moduleSettings).foreach(entry => startModule(entry))
}
} { existentModuleSettings =>
// When a module already exists it can either need to be stopped, or to be reconfigured.
if(moduleSettings.enabled) {
reconfigureModule(existentModuleSettings, config)
} else {
stopModule(existentModuleSettings)
}
}
}
// Remove all modules that no longer exist in the configuration.
val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
missingModules.foreach {
case (_, entry) => stopModule(entry)
}
}
/**
* Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
* registry.
*/
def reconfigure(newConfig: Config): Unit = synchronized {
_registrySettings = readRegistryConfiguration(configuration.config())
_registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
scheduleMetricsTicker()
scheduleSpansTicker()
}
/**
* Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
* spans available until the call to stop.
*/
def stop(): Future[Unit] = synchronized {
implicit val cleanupExecutor = ExecutionContext.Implicits.global
scheduleMetricsTicker(once = true)
scheduleSpansTicker(once = true)
val stopSignals = _registeredModules.values.map(stopModule)
val latch = new CountDownLatch(stopSignals.size)
stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))
// There is a global 30 seconds limit to shutdown after which all executors will shut down.
val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
stopCompletionFuture.onComplete(_ => {
_metricsTickerExecutor.shutdown()
_spansTickerExecutor.shutdown()
})
stopCompletionFuture.map(_ => ())
}
/**
* (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
* all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
* cancelled and scheduled again.
*/
private def scheduleMetricsTicker(once: Boolean = false): Unit = {
val currentMetricsTicker = _metricsTickerSchedule.get()
if(currentMetricsTicker != null)
currentMetricsTicker.cancel(false)
_metricsTickerSchedule.set {
val interval = _registrySettings.metricTickInterval.toMillis
val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
val now = clock.instant()
val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
Duration.between(now, nextTick).toMillis
} else _registrySettings.metricTickInterval.toMillis
val ticker = new Runnable {
var lastInstant = Instant.now(clock)
override def run(): Unit = try {
val currentInstant = Instant.now(clock)
val periodSnapshot = PeriodSnapshot(
from = lastInstant,
to = currentInstant,
metrics = snapshotGenerator.snapshot())
metricReporterModules().foreach { entry =>
Future {
Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
_logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
}
}(entry.executionContext)
}
lastInstant = currentInstant
} catch {
case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
}
}
if(once)
_metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
else
_metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
}
}
/**
* (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
* all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
* cancelled and scheduled again.
*/
private def scheduleSpansTicker(once: Boolean = false): Unit = {
val currentSpansTicker = _spansTickerSchedule.get()
if(currentSpansTicker != null)
currentSpansTicker.cancel(false)
_spansTickerSchedule.set {
val interval = _registrySettings.traceTickInterval.toMillis
val ticker = new Runnable {
override def run(): Unit = try {
val spanBatch = spanBuffer.flush()
spanReporterModules().foreach { entry =>
Future {
Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
_logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
}
}(entry.executionContext)
}
} catch {
case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
}
}
if(once)
_spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
else
_spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
}
}
private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
_metricReporterModules.values
}
private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
_spanReporterModules.values
}
private def readModuleSettings(config: Config): Seq[Settings] = {
val moduleConfigs = config.getConfig("kamon.modules").configurations
val moduleSettings = moduleConfigs.map {
case (moduleName, moduleConfig) =>
val moduleSettings = Try {
Settings(
moduleName,
moduleConfig.getString("class"),
moduleConfig.getBoolean("enabled")
)
}
moduleSettings.failed.foreach { t =>
_logger.warn(s"Failed to read configuration for module [$moduleName]", t)
if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
_logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
}
}
moduleSettings
} filter(_.isSuccess) map(_.get) toSeq
// Legacy modules from <1.2.0
val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass =>
Settings(moduleClass, moduleClass, true)
} toSeq
val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty)
repeatedLegacyModules.foreach(m =>
_logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
uniqueLegacyModules.foreach(m =>
_logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
moduleSettings ++ uniqueLegacyModules
}
/**
* Creates a module from the provided settings.
*/
private def createModule(settings: Settings): Option[Entry[Module]] = {
val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil)
val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance))
moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
moduleEntry.toOption
}
private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
val executor = Executors.newSingleThreadExecutor(threadFactory(name))
Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance)
}
/**
* Registers a module and schedules execution of its start procedure.
*/
private def startModule(entry: Entry[Module]): Unit = {
registerModule(entry)
// Schedule the start hook on the module
entry.executionContext.execute(new Runnable {
override def run(): Unit =
Try(entry.module.start())
.failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
})
}
private def registerModule(entry: Entry[Module]): Unit = {
_registeredModules = _registeredModules + (entry.name -> entry)
if(entry.module.isInstanceOf[MetricReporter])
_metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
if(entry.module.isInstanceOf[SpanReporter])
_spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])
}
/**
* Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
* context. The returned future completes when the module finishes its stop procedure.
*/
private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
val cleanupExecutor = ExecutionContext.Implicits.global
// Remove the module from all registries
_registeredModules = _registeredModules - entry.name
if(entry.module.isInstanceOf[MetricReporter])
_metricReporterModules = _metricReporterModules - entry.name
if(entry.module.isInstanceOf[SpanReporter])
_spanReporterModules = _spanReporterModules - entry.name
// Schedule a call to stop on the module
val stopPromise = Promise[Unit]()
entry.executionContext.execute(new Runnable {
override def run(): Unit =
stopPromise.complete {
val stopResult = Try(entry.module.stop())
stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
stopResult
}
})
stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
stopPromise.future
}
/**
* Schedules a call to reconfigure on the module's execution context.
*/
private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
entry.executionContext.execute(new Runnable {
override def run(): Unit =
Try(entry.module.reconfigure(config))
.failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
})
}
private def noopRegistration(moduleName: String): Registration = new Registration {
override def cancel(): Unit =
_logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
}
private def registration(entry: Entry[Module]): Registration = new Registration {
override def cancel(): Unit = stopModule(entry)
}
}
private def readRegistryConfiguration(config: Config): RegistrySettings =
RegistrySettings(
metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
)
private case class RegistrySettings(
metricTickInterval: Duration,
optimisticMetricTickAlignment: Boolean,
traceTickInterval: Duration,
traceReporterQueueSize: Int
)
private case class Settings(
name: String,
fqcn: String,
enabled: Boolean
)
private case class Entry[T <: Module](
name: String,
executionContext: ExecutionContextExecutorService,
programmaticallyAdded: Boolean,
module: T
)
}