aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
blob: d45cd80ff42d51d537258174fe204201f26d112f (plain) (tree)













































































                                                                                                                                                                   
                                                            



























































































































































                                                                                                                                      
                                                                                                              


















                                                                                                                                


                                                                                     
 







                                                                                                                                      






















                                                                                                              



                                                                                                                           








                                                                                           


                                                                                                                                 
 


                                                                                                                                                   





































                                                                                                                         
                                                                                                             






                                                                         
                                           

                               



                                                                
                                                                                                                         
                                                















































































































                                                                                                                    
package kamon
package module

import java.time.{Duration, Instant}
import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference

import com.typesafe.config.Config
import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
import kamon.module.Module.Registration
import kamon.status.Status
import kamon.trace.Tracer.SpanBuffer
import kamon.util.Clock
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal



/**
  * Controls the lifecycle of all available modules.
  */
class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
  private val _logger = LoggerFactory.getLogger(classOf[ModuleRegistry])
  private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
  private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))

  private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
  private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()

  private var _registrySettings = readRegistrySettings(configuration.config())
  private var _registeredModules: Map[String, Entry[Module]] = Map.empty
  private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
  private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty

  // Start ticking as soon as the registry is created.
  scheduleMetricsTicker()
  scheduleSpansTicker()


  /**
    * Registers a module that has already been instantiated by the user. The start callback will be executed as part
    * of the registration process. If a module with the specified name already exists the registration will fail. If
    * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
    * and spans data upon every tick.
    *
    * @param name Desired module name.
    * @param module Module instance.
    * @return A registration that can be used to stop the module at any time.
    */
  def register(name: String, module: Module): Registration = synchronized {
    if(_registeredModules.get(name).isEmpty) {
      val inferredSettings = Module.Settings(
        name,
        module.getClass.getName,
        module.getClass,
        inferModuleKind(module.getClass),
        true
      )

      val moduleEntry = createEntry(inferredSettings, true, module)
      startModule(moduleEntry)
      registration(moduleEntry)

    } else {
      _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
      noopRegistration(name)
    }
  }

  /**
    * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
    * configured modules state.
    */
  def load(config: Config): Unit = synchronized {
    val configuredModules = readModuleSettings(config, true)
    val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }

    // Start, reconfigure and stop modules that are still present but disabled.
    configuredModules.foreach { moduleSettings =>
      automaticallyRegisteredModules.get(moduleSettings.name).fold {
        // The module does not exist in the registry, the only possible action is starting it, if enabled.
        if(moduleSettings.enabled) {
          createModule(moduleSettings).foreach(entry => startModule(entry))
        }

      } { existentModuleSettings =>
        // When a module already exists it can either need to be stopped, or to be reconfigured.
        if(moduleSettings.enabled) {
          reconfigureModule(existentModuleSettings, config)
        } else {
          stopModule(existentModuleSettings)
        }
      }
    }

    // Remove all modules that no longer exist in the configuration.
    val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
    missingModules.foreach {
      case (_, entry) => stopModule(entry)
    }
  }

  /**
    * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
    * registry.
    */
  def reconfigure(newConfig: Config): Unit = synchronized {
    _registrySettings = readRegistrySettings(configuration.config())
    _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
    scheduleMetricsTicker()
    scheduleSpansTicker()
  }

  /**
    * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
    * spans available until the call to stop.
    */
  def stop(): Future[Unit] = synchronized {
    implicit val cleanupExecutor = ExecutionContext.Implicits.global
    scheduleMetricsTicker(once = true)
    scheduleSpansTicker(once = true)

    val stopSignals =  _registeredModules.values.map(stopModule)
    val latch = new CountDownLatch(stopSignals.size)
    stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))

    // There is a global 30 seconds limit to shutdown after which all executors will shut down.
    val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
    stopCompletionFuture.onComplete(_ => {
      _metricsTickerExecutor.shutdown()
      _spansTickerExecutor.shutdown()
    })

    stopCompletionFuture.map(_ => ())
  }


  /**
    * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
    * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
    * cancelled and scheduled again.
    */
  private def scheduleMetricsTicker(once: Boolean = false): Unit = {
    val currentMetricsTicker = _metricsTickerSchedule.get()
    if(currentMetricsTicker != null)
      currentMetricsTicker.cancel(false)

    _metricsTickerSchedule.set {
      val interval = _registrySettings.metricTickInterval.toMillis
      val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
        val now = clock.instant()
        val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
        Duration.between(now, nextTick).toMillis
      } else _registrySettings.metricTickInterval.toMillis

      val ticker = new Runnable {
        var lastInstant = Instant.now(clock)

        override def run(): Unit = try {
          val currentInstant = Instant.now(clock)
          val periodSnapshot = PeriodSnapshot(
            from = lastInstant,
            to = currentInstant,
            metrics = snapshotGenerator.snapshot())

          metricReporterModules().foreach { entry =>
            Future {
              Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
                _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
              }
            }(entry.executionContext)
          }

          lastInstant = currentInstant
        } catch {
          case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
        }
      }

      if(once)
        _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
      else
        _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
    }
  }

  /**
    * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
    * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
    * cancelled and scheduled again.
    */
  private def scheduleSpansTicker(once: Boolean = false): Unit = {
    val currentSpansTicker = _spansTickerSchedule.get()
    if(currentSpansTicker != null)
      currentSpansTicker.cancel(false)

    _spansTickerSchedule.set {
      val interval = _registrySettings.traceTickInterval.toMillis

      val ticker = new Runnable {
        override def run(): Unit = try {
          val spanBatch = spanBuffer.flush()

          spanReporterModules().foreach { entry =>
            Future {
              Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
                _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
              }
            }(entry.executionContext)
          }

        } catch {
          case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
        }
      }

      if(once)
        _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
      else
        _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
    }
  }

  private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
    _metricReporterModules.values
  }

  private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
    _spanReporterModules.values
  }

  private def readModuleSettings(config: Config, emitConfigurationWarnings: Boolean): Seq[Module.Settings] = {
    val moduleConfigs = config.getConfig("kamon.modules").configurations
    val moduleSettings = moduleConfigs.map {
      case (moduleName, moduleConfig) =>
        val moduleSettings = Try {
          Module.Settings(
            moduleName,
            moduleConfig.getString("description"),
            classLoading.resolveClass[Module](moduleConfig.getString("class")).get,
            parseModuleKind(moduleConfig.getString("kind")),
            moduleConfig.getBoolean("enabled")
          )
        }.map(ms => {
          val inferredModuleKind = inferModuleKind(ms.clazz)
          assert(inferredModuleKind == ms.kind,
            s"Module [${ms.name}] is configured as [${ms.kind}] but the actual type does not comply to the expected interface.")
          ms
        })


        if(emitConfigurationWarnings) {
          moduleSettings.failed.foreach { t =>
            _logger.warn(s"Failed to read configuration for module [$moduleName]", t)

            val hasLegacySettings =
              moduleConfig.hasPath("requires-aspectj") ||
              moduleConfig.hasPath("auto-start") ||
              moduleConfig.hasPath("extension-class")

            if (hasLegacySettings) {
              _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
            }
          }
        }

        moduleSettings

    } filter(_.isSuccess) map(_.get) toSeq


    // Load all modules that might have been configured using the legacy "kamon.reporters" setting from <1.2.0
    // versions. This little hack should be removed by the time we release 2.0.
    //
    if(config.hasPath("kamon.reporters")) {
      val legacyModuleSettings = config.getStringList("kamon.reporters").asScala
        .map(moduleClass => {
          val moduleSettings = Try {
            val moduleClazz = classLoading.resolveClass[Module](moduleClass).get
            val inferredModuleKind = inferModuleKind(moduleClazz)
            val name = moduleClazz.getName()
            val description = "Module detected from the legacy kamon.reporters configuration."

            Module.Settings(name, description, moduleClazz, inferredModuleKind, true)
          }

          if(emitConfigurationWarnings) {
            moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t))
          }

          moduleSettings
        })
        .filter(_.isSuccess)
        .map(_.get)


      val (repeatedLegacyModules, uniqueLegacyModules) = legacyModuleSettings
        .partition(lm => moduleSettings.find(_.clazz.getName == lm.clazz.getName).nonEmpty)

      if(emitConfigurationWarnings) {
        repeatedLegacyModules.foreach(m =>
          _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))

        uniqueLegacyModules.foreach(m =>
          _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
      }

      moduleSettings ++ uniqueLegacyModules

    } else moduleSettings
  }

  /**
    * Creates a module from the provided settings.
    */
  private def createModule(settings: Module.Settings): Option[Entry[Module]] = {
    val moduleInstance = classLoading.createInstance[Module](settings.clazz, Nil)
    val moduleEntry = moduleInstance.map(instance => createEntry(settings, false, instance))

    moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
    moduleEntry.toOption
  }

  private def createEntry(settings: Module.Settings, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
    val executor = Executors.newSingleThreadExecutor(threadFactory(settings.name))
    Entry(settings.name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, settings, instance)
  }

  private def inferModuleKind(clazz: Class[_ <: Module]): Module.Kind = {
    if(classOf[CombinedReporter].isAssignableFrom(clazz))
      Module.Kind.Combined
    else if(classOf[MetricReporter].isAssignableFrom(clazz))
      Module.Kind.Metric
    else if(classOf[SpanReporter].isAssignableFrom(clazz))
      Module.Kind.Span
    else
      Module.Kind.Plain
  }


  /**
    * Returns the current status of this module registry.
    */
  private[kamon] def status(): Status.ModuleRegistry = {
    val automaticallyAddedModules = readModuleSettings(configuration.config(), false).map(moduleSettings => {
      val isActive = _registeredModules.get(moduleSettings.name).nonEmpty

      Status.Module(
        moduleSettings.name,
        moduleSettings.description,
        moduleSettings.clazz.getCanonicalName,
        moduleSettings.kind,
        programmaticallyRegistered = false,
        moduleSettings.enabled,
        isActive)
    })

    val programmaticallyAddedModules = _registeredModules
      .filter { case (_, entry) => entry.programmaticallyAdded }
      .map { case (name, entry) => Status.Module(name, entry.settings.description, entry.settings.clazz.getCanonicalName,
        entry.settings.kind, true, true, true) }

    val allModules = automaticallyAddedModules ++ programmaticallyAddedModules
    Status.ModuleRegistry(allModules)
  }


  /**
    * Registers a module and schedules execution of its start procedure.
    */
  private def startModule(entry: Entry[Module]): Unit = {
    registerModule(entry)

    // Schedule the start hook on the module
    entry.executionContext.execute(new Runnable {
      override def run(): Unit =
        Try(entry.module.start())
          .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
    })
  }

  private def registerModule(entry: Entry[Module]): Unit = {
    _registeredModules = _registeredModules + (entry.name -> entry)
    if(entry.module.isInstanceOf[MetricReporter])
      _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
    if(entry.module.isInstanceOf[SpanReporter])
      _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])

  }

  /**
    * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
    * context. The returned future completes when the module finishes its stop procedure.
    */
  private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
    val cleanupExecutor = ExecutionContext.Implicits.global

    // Remove the module from all registries
    _registeredModules = _registeredModules - entry.name
    if(entry.module.isInstanceOf[MetricReporter])
      _metricReporterModules = _metricReporterModules - entry.name
    if(entry.module.isInstanceOf[SpanReporter])
      _spanReporterModules = _spanReporterModules - entry.name


    // Schedule a call to stop on the module
    val stopPromise = Promise[Unit]()
    entry.executionContext.execute(new Runnable {
      override def run(): Unit =
        stopPromise.complete {
          val stopResult = Try(entry.module.stop())
          stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
          stopResult
        }

    })

    stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
    stopPromise.future
  }

  /**
    * Schedules a call to reconfigure on the module's execution context.
    */
  private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
    entry.executionContext.execute(new Runnable {
      override def run(): Unit =
        Try(entry.module.reconfigure(config))
          .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
    })
  }

  private def noopRegistration(moduleName: String): Registration = new Registration {
    override def cancel(): Unit =
      _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
  }

  private def registration(entry: Entry[Module]): Registration = new Registration {
    override def cancel(): Unit = stopModule(entry)
  }

  private def parseModuleKind(kind: String): Module.Kind = kind.toLowerCase match {
    case "combined" => Module.Kind.Combined
    case "metric"   => Module.Kind.Metric
    case "span"     => Module.Kind.Span
    case "plain"    => Module.Kind.Plain
  }

  private def readRegistrySettings(config: Config): Settings =
    Settings(
      metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
      optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
      traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
      traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
    )

  private case class Settings(
    metricTickInterval: Duration,
    optimisticMetricTickAlignment: Boolean,
    traceTickInterval: Duration,
    traceReporterQueueSize: Int
  )


  private case class Entry[T <: Module](
    name: String,
    executionContext: ExecutionContextExecutorService,
    programmaticallyAdded: Boolean,
    settings: Module.Settings,
    module: T
  )
}