aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/module/Module.scala
blob: 41649629df5f1ccfa613f6eb5f4dc153869d0a03 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
package kamon
package module

import java.time.{Duration, Instant}
import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference

import com.typesafe.config.Config
import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
import kamon.trace.Tracer.SpanBuffer
import kamon.util.Clock
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal

/**
  * Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace
  * data to external services. Additionally, modules can be automatically registered in Kamon by simply being present
  * in the classpath and having the appropriate entry in the configuration file. All modules get a dedicated execution
  * context which will be used to call the start, stop and reconfigure hooks.
  *
  * Besides the basic lifecycle hooks, when registering a [[MetricReporter]] and/or [[SpanReporter]] module, Kamon will
  * also schedule calls to [[MetricReporter.reportPeriodSnapshot()]] and [[SpanReporter.reportSpans()]] in the module's
  * execution context.
  */
trait Module {

  /**
    * Signals that a the module has been registered in Kamon's module registry.
    */
  def start(): Unit

  /**
    * Signals that the module should be stopped and all acquired resources, if any, should be released.
    */
  def stop(): Unit

  /**
    * Signals that a new configuration object has been provided to Kamon. Modules should ensure that their internal
    * settings are in sync with the provided configuration.
    */
  def reconfigure(newConfig: Config): Unit
}


object Module {

  /**
    * Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its
    * registration.
    */
  trait Registration {

    /**
      * Removes and stops the related module.
      */
    def cancel(): Unit
  }

  /**
    * Controls the lifecycle of all available modules.
    */
  class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
    private val _logger = LoggerFactory.getLogger(classOf[Registry])
    private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
    private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))

    private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
    private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()

    private var _registrySettings = readRegistryConfiguration(configuration.config())
    private var _registeredModules: Map[String, Entry[Module]] = Map.empty
    private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
    private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty

    // Start ticking as soon as the registry is created.
    scheduleMetricsTicker()
    scheduleSpansTicker()


    /**
      * Registers a module that has already been instantiated by the user. The start callback will be executed as part
      * of the registration process. If a module with the specified name already exists the registration will fail. If
      * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
      * and spans data upon every tick.
      *
      * @param name Desired module name.
      * @param module Module instance.
      * @return A registration that can be used to stop the module at any time.
      */
    def register(name: String, module: Module): Registration = synchronized {
      if(_registeredModules.get(name).isEmpty) {
        val moduleEntry = createEntry(name, true, module)
        startModule(moduleEntry)
        registration(moduleEntry)

      } else {
        _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
        noopRegistration(name)
      }
    }

    /**
      * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
      * configured modules state.
      */
    def load(config: Config): Unit = synchronized {
      val configuredModules = readModuleSettings(config)
      val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }

      // Start, reconfigure and stop modules that are still present but disabled.
      configuredModules.foreach { moduleSettings =>
        automaticallyRegisteredModules.get(moduleSettings.name).fold {
          // The module does not exist in the registry, the only possible action is starting it, if enabled.
          if(moduleSettings.enabled) {
            createModule(moduleSettings).foreach(entry => startModule(entry))
          }

        } { existentModuleSettings =>
          // When a module already exists it can either need to be stopped, or to be reconfigured.
          if(moduleSettings.enabled) {
            reconfigureModule(existentModuleSettings, config)
          } else {
            stopModule(existentModuleSettings)
          }
        }
      }

      // Remove all modules that no longer exist in the configuration.
      val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
      missingModules.foreach {
        case (_, entry) => stopModule(entry)
      }
    }

    /**
      * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
      * registry.
      */
    def reconfigure(newConfig: Config): Unit = synchronized {
      _registrySettings = readRegistryConfiguration(configuration.config())
      _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
      scheduleMetricsTicker()
      scheduleSpansTicker()
    }

    /**
      * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
      * spans available until the call to stop.
      */
    def stop(): Future[Unit] = synchronized {
      implicit val cleanupExecutor = ExecutionContext.Implicits.global
      scheduleMetricsTicker(once = true)
      scheduleSpansTicker(once = true)

      val stopSignals =  _registeredModules.values.map(stopModule)
      val latch = new CountDownLatch(stopSignals.size)
      stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))

      // There is a global 30 seconds limit to shutdown after which all executors will shut down.
      val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
      stopCompletionFuture.onComplete(_ => {
        _metricsTickerExecutor.shutdown()
        _spansTickerExecutor.shutdown()
      })

      stopCompletionFuture.map(_ => ())
    }


    /**
      * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
      * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
      * cancelled and scheduled again.
      */
    private def scheduleMetricsTicker(once: Boolean = false): Unit = {
      val currentMetricsTicker = _metricsTickerSchedule.get()
      if(currentMetricsTicker != null)
        currentMetricsTicker.cancel(false)

      _metricsTickerSchedule.set {
        val interval = _registrySettings.metricTickInterval.toMillis
        val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
          val now = clock.instant()
          val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
          Duration.between(now, nextTick).toMillis
        } else _registrySettings.metricTickInterval.toMillis

        val ticker = new Runnable {
          var lastInstant = Instant.now(clock)

          override def run(): Unit = try {
            val currentInstant = Instant.now(clock)
            val periodSnapshot = PeriodSnapshot(
              from = lastInstant,
              to = currentInstant,
              metrics = snapshotGenerator.snapshot())

            metricReporterModules().foreach { entry =>
              Future {
                Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
                  _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
                }
              }(entry.executionContext)
            }

            lastInstant = currentInstant
          } catch {
            case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
          }
        }

        if(once)
          _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
        else
          _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
      }
    }

    /**
      * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
      * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
      * cancelled and scheduled again.
      */
    private def scheduleSpansTicker(once: Boolean = false): Unit = {
      val currentSpansTicker = _spansTickerSchedule.get()
      if(currentSpansTicker != null)
        currentSpansTicker.cancel(false)

      _spansTickerSchedule.set {
        val interval = _registrySettings.traceTickInterval.toMillis

        val ticker = new Runnable {
          override def run(): Unit = try {
            val spanBatch = spanBuffer.flush()

            spanReporterModules().foreach { entry =>
              Future {
                Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
                  _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
                }
              }(entry.executionContext)
            }

          } catch {
            case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
          }
        }

        if(once)
          _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
        else
          _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
      }
    }

    private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
      _metricReporterModules.values
    }

    private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
      _spanReporterModules.values
    }

    private def readModuleSettings(config: Config): Seq[Settings] = {
      val moduleConfigs = config.getConfig("kamon.modules").configurations
      val moduleSettings = moduleConfigs.map {
        case (moduleName, moduleConfig) =>
          val moduleSettings = Try {
            Settings(
              moduleName,
              moduleConfig.getString("class"),
              moduleConfig.getBoolean("enabled")
            )
          }

          moduleSettings.failed.foreach { t =>
            _logger.warn(s"Failed to read configuration for module [$moduleName]", t)

            if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
              _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
            }
          }

          moduleSettings

      } filter(_.isSuccess) map(_.get) toSeq

      // Legacy modules from <1.2.0
      val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass =>
        Settings(moduleClass, moduleClass, true)
      } toSeq

      val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty)
      repeatedLegacyModules.foreach(m =>
        _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))

      uniqueLegacyModules.foreach(m =>
        _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))

      moduleSettings ++ uniqueLegacyModules
    }

    /**
      * Creates a module from the provided settings.
      */
    private def createModule(settings: Settings): Option[Entry[Module]] = {
      val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil)
      val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance))

      moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
      moduleEntry.toOption
    }

    private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
      val executor = Executors.newSingleThreadExecutor(threadFactory(name))
      Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance)
    }


    /**
      * Registers a module and schedules execution of its start procedure.
      */
    private def startModule(entry: Entry[Module]): Unit = {
      registerModule(entry)

      // Schedule the start hook on the module
      entry.executionContext.execute(new Runnable {
        override def run(): Unit =
          Try(entry.module.start())
            .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
      })
    }

    private def registerModule(entry: Entry[Module]): Unit = {
      _registeredModules = _registeredModules + (entry.name -> entry)
      if(entry.module.isInstanceOf[MetricReporter])
        _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
      if(entry.module.isInstanceOf[SpanReporter])
        _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])

    }

    /**
      * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
      * context. The returned future completes when the module finishes its stop procedure.
      */
    private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
      val cleanupExecutor = ExecutionContext.Implicits.global

      // Remove the module from all registries
      _registeredModules = _registeredModules - entry.name
      if(entry.module.isInstanceOf[MetricReporter])
        _metricReporterModules = _metricReporterModules - entry.name
      if(entry.module.isInstanceOf[SpanReporter])
        _spanReporterModules = _spanReporterModules - entry.name


      // Schedule a call to stop on the module
      val stopPromise = Promise[Unit]()
      entry.executionContext.execute(new Runnable {
        override def run(): Unit =
          stopPromise.complete {
            val stopResult = Try(entry.module.stop())
            stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
            stopResult
          }

      })

      stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
      stopPromise.future
    }

    /**
      * Schedules a call to reconfigure on the module's execution context.
      */
    private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
      entry.executionContext.execute(new Runnable {
        override def run(): Unit =
          Try(entry.module.reconfigure(config))
            .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
      })
    }

    private def noopRegistration(moduleName: String): Registration = new Registration {
      override def cancel(): Unit =
        _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
    }

    private def registration(entry: Entry[Module]): Registration = new Registration {
      override def cancel(): Unit = stopModule(entry)
    }
  }

  private def readRegistryConfiguration(config: Config): RegistrySettings =
    RegistrySettings(
      metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
      optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
      traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
      traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
    )

  private case class RegistrySettings(
    metricTickInterval: Duration,
    optimisticMetricTickAlignment: Boolean,
    traceTickInterval: Duration,
    traceReporterQueueSize: Int
  )

  private case class Settings(
    name: String,
    fqcn: String,
    enabled: Boolean
  )

  private case class Entry[T <: Module](
    name: String,
    executionContext: ExecutionContextExecutorService,
    programmaticallyAdded: Boolean,
    module: T
  )
}