aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
blob: d45cd80ff42d51d537258174fe204201f26d112f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
package kamon
package module

import java.time.{Duration, Instant}
import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicReference

import com.typesafe.config.Config
import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
import kamon.module.Module.Registration
import kamon.status.Status
import kamon.trace.Tracer.SpanBuffer
import kamon.util.Clock
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal



/**
  * Controls the lifecycle of all available modules.
  */
class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
  private val _logger = LoggerFactory.getLogger(classOf[ModuleRegistry])
  private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
  private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))

  private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
  private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()

  private var _registrySettings = readRegistrySettings(configuration.config())
  private var _registeredModules: Map[String, Entry[Module]] = Map.empty
  private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
  private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty

  // Start ticking as soon as the registry is created.
  scheduleMetricsTicker()
  scheduleSpansTicker()


  /**
    * Registers a module that has already been instantiated by the user. The start callback will be executed as part
    * of the registration process. If a module with the specified name already exists the registration will fail. If
    * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
    * and spans data upon every tick.
    *
    * @param name Desired module name.
    * @param module Module instance.
    * @return A registration that can be used to stop the module at any time.
    */
  def register(name: String, module: Module): Registration = synchronized {
    if(_registeredModules.get(name).isEmpty) {
      val inferredSettings = Module.Settings(
        name,
        module.getClass.getName,
        module.getClass,
        inferModuleKind(module.getClass),
        true
      )

      val moduleEntry = createEntry(inferredSettings, true, module)
      startModule(moduleEntry)
      registration(moduleEntry)

    } else {
      _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
      noopRegistration(name)
    }
  }

  /**
    * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
    * configured modules state.
    */
  def load(config: Config): Unit = synchronized {
    val configuredModules = readModuleSettings(config, true)
    val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }

    // Start, reconfigure and stop modules that are still present but disabled.
    configuredModules.foreach { moduleSettings =>
      automaticallyRegisteredModules.get(moduleSettings.name).fold {
        // The module does not exist in the registry, the only possible action is starting it, if enabled.
        if(moduleSettings.enabled) {
          createModule(moduleSettings).foreach(entry => startModule(entry))
        }

      } { existentModuleSettings =>
        // When a module already exists it can either need to be stopped, or to be reconfigured.
        if(moduleSettings.enabled) {
          reconfigureModule(existentModuleSettings, config)
        } else {
          stopModule(existentModuleSettings)
        }
      }
    }

    // Remove all modules that no longer exist in the configuration.
    val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
    missingModules.foreach {
      case (_, entry) => stopModule(entry)
    }
  }

  /**
    * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
    * registry.
    */
  def reconfigure(newConfig: Config): Unit = synchronized {
    _registrySettings = readRegistrySettings(configuration.config())
    _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
    scheduleMetricsTicker()
    scheduleSpansTicker()
  }

  /**
    * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
    * spans available until the call to stop.
    */
  def stop(): Future[Unit] = synchronized {
    implicit val cleanupExecutor = ExecutionContext.Implicits.global
    scheduleMetricsTicker(once = true)
    scheduleSpansTicker(once = true)

    val stopSignals =  _registeredModules.values.map(stopModule)
    val latch = new CountDownLatch(stopSignals.size)
    stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))

    // There is a global 30 seconds limit to shutdown after which all executors will shut down.
    val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
    stopCompletionFuture.onComplete(_ => {
      _metricsTickerExecutor.shutdown()
      _spansTickerExecutor.shutdown()
    })

    stopCompletionFuture.map(_ => ())
  }


  /**
    * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
    * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
    * cancelled and scheduled again.
    */
  private def scheduleMetricsTicker(once: Boolean = false): Unit = {
    val currentMetricsTicker = _metricsTickerSchedule.get()
    if(currentMetricsTicker != null)
      currentMetricsTicker.cancel(false)

    _metricsTickerSchedule.set {
      val interval = _registrySettings.metricTickInterval.toMillis
      val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
        val now = clock.instant()
        val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
        Duration.between(now, nextTick).toMillis
      } else _registrySettings.metricTickInterval.toMillis

      val ticker = new Runnable {
        var lastInstant = Instant.now(clock)

        override def run(): Unit = try {
          val currentInstant = Instant.now(clock)
          val periodSnapshot = PeriodSnapshot(
            from = lastInstant,
            to = currentInstant,
            metrics = snapshotGenerator.snapshot())

          metricReporterModules().foreach { entry =>
            Future {
              Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
                _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
              }
            }(entry.executionContext)
          }

          lastInstant = currentInstant
        } catch {
          case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
        }
      }

      if(once)
        _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
      else
        _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
    }
  }

  /**
    * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
    * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
    * cancelled and scheduled again.
    */
  private def scheduleSpansTicker(once: Boolean = false): Unit = {
    val currentSpansTicker = _spansTickerSchedule.get()
    if(currentSpansTicker != null)
      currentSpansTicker.cancel(false)

    _spansTickerSchedule.set {
      val interval = _registrySettings.traceTickInterval.toMillis

      val ticker = new Runnable {
        override def run(): Unit = try {
          val spanBatch = spanBuffer.flush()

          spanReporterModules().foreach { entry =>
            Future {
              Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
                _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
              }
            }(entry.executionContext)
          }

        } catch {
          case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
        }
      }

      if(once)
        _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
      else
        _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
    }
  }

  private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
    _metricReporterModules.values
  }

  private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
    _spanReporterModules.values
  }

  private def readModuleSettings(config: Config, emitConfigurationWarnings: Boolean): Seq[Module.Settings] = {
    val moduleConfigs = config.getConfig("kamon.modules").configurations
    val moduleSettings = moduleConfigs.map {
      case (moduleName, moduleConfig) =>
        val moduleSettings = Try {
          Module.Settings(
            moduleName,
            moduleConfig.getString("description"),
            classLoading.resolveClass[Module](moduleConfig.getString("class")).get,
            parseModuleKind(moduleConfig.getString("kind")),
            moduleConfig.getBoolean("enabled")
          )
        }.map(ms => {
          val inferredModuleKind = inferModuleKind(ms.clazz)
          assert(inferredModuleKind == ms.kind,
            s"Module [${ms.name}] is configured as [${ms.kind}] but the actual type does not comply to the expected interface.")
          ms
        })


        if(emitConfigurationWarnings) {
          moduleSettings.failed.foreach { t =>
            _logger.warn(s"Failed to read configuration for module [$moduleName]", t)

            val hasLegacySettings =
              moduleConfig.hasPath("requires-aspectj") ||
              moduleConfig.hasPath("auto-start") ||
              moduleConfig.hasPath("extension-class")

            if (hasLegacySettings) {
              _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
            }
          }
        }

        moduleSettings

    } filter(_.isSuccess) map(_.get) toSeq


    // Load all modules that might have been configured using the legacy "kamon.reporters" setting from <1.2.0
    // versions. This little hack should be removed by the time we release 2.0.
    //
    if(config.hasPath("kamon.reporters")) {
      val legacyModuleSettings = config.getStringList("kamon.reporters").asScala
        .map(moduleClass => {
          val moduleSettings = Try {
            val moduleClazz = classLoading.resolveClass[Module](moduleClass).get
            val inferredModuleKind = inferModuleKind(moduleClazz)
            val name = moduleClazz.getName()
            val description = "Module detected from the legacy kamon.reporters configuration."

            Module.Settings(name, description, moduleClazz, inferredModuleKind, true)
          }

          if(emitConfigurationWarnings) {
            moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t))
          }

          moduleSettings
        })
        .filter(_.isSuccess)
        .map(_.get)


      val (repeatedLegacyModules, uniqueLegacyModules) = legacyModuleSettings
        .partition(lm => moduleSettings.find(_.clazz.getName == lm.clazz.getName).nonEmpty)

      if(emitConfigurationWarnings) {
        repeatedLegacyModules.foreach(m =>
          _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))

        uniqueLegacyModules.foreach(m =>
          _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
      }

      moduleSettings ++ uniqueLegacyModules

    } else moduleSettings
  }

  /**
    * Creates a module from the provided settings.
    */
  private def createModule(settings: Module.Settings): Option[Entry[Module]] = {
    val moduleInstance = classLoading.createInstance[Module](settings.clazz, Nil)
    val moduleEntry = moduleInstance.map(instance => createEntry(settings, false, instance))

    moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
    moduleEntry.toOption
  }

  private def createEntry(settings: Module.Settings, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
    val executor = Executors.newSingleThreadExecutor(threadFactory(settings.name))
    Entry(settings.name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, settings, instance)
  }

  private def inferModuleKind(clazz: Class[_ <: Module]): Module.Kind = {
    if(classOf[CombinedReporter].isAssignableFrom(clazz))
      Module.Kind.Combined
    else if(classOf[MetricReporter].isAssignableFrom(clazz))
      Module.Kind.Metric
    else if(classOf[SpanReporter].isAssignableFrom(clazz))
      Module.Kind.Span
    else
      Module.Kind.Plain
  }


  /**
    * Returns the current status of this module registry.
    */
  private[kamon] def status(): Status.ModuleRegistry = {
    val automaticallyAddedModules = readModuleSettings(configuration.config(), false).map(moduleSettings => {
      val isActive = _registeredModules.get(moduleSettings.name).nonEmpty

      Status.Module(
        moduleSettings.name,
        moduleSettings.description,
        moduleSettings.clazz.getCanonicalName,
        moduleSettings.kind,
        programmaticallyRegistered = false,
        moduleSettings.enabled,
        isActive)
    })

    val programmaticallyAddedModules = _registeredModules
      .filter { case (_, entry) => entry.programmaticallyAdded }
      .map { case (name, entry) => Status.Module(name, entry.settings.description, entry.settings.clazz.getCanonicalName,
        entry.settings.kind, true, true, true) }

    val allModules = automaticallyAddedModules ++ programmaticallyAddedModules
    Status.ModuleRegistry(allModules)
  }


  /**
    * Registers a module and schedules execution of its start procedure.
    */
  private def startModule(entry: Entry[Module]): Unit = {
    registerModule(entry)

    // Schedule the start hook on the module
    entry.executionContext.execute(new Runnable {
      override def run(): Unit =
        Try(entry.module.start())
          .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
    })
  }

  private def registerModule(entry: Entry[Module]): Unit = {
    _registeredModules = _registeredModules + (entry.name -> entry)
    if(entry.module.isInstanceOf[MetricReporter])
      _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
    if(entry.module.isInstanceOf[SpanReporter])
      _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])

  }

  /**
    * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
    * context. The returned future completes when the module finishes its stop procedure.
    */
  private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
    val cleanupExecutor = ExecutionContext.Implicits.global

    // Remove the module from all registries
    _registeredModules = _registeredModules - entry.name
    if(entry.module.isInstanceOf[MetricReporter])
      _metricReporterModules = _metricReporterModules - entry.name
    if(entry.module.isInstanceOf[SpanReporter])
      _spanReporterModules = _spanReporterModules - entry.name


    // Schedule a call to stop on the module
    val stopPromise = Promise[Unit]()
    entry.executionContext.execute(new Runnable {
      override def run(): Unit =
        stopPromise.complete {
          val stopResult = Try(entry.module.stop())
          stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
          stopResult
        }

    })

    stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
    stopPromise.future
  }

  /**
    * Schedules a call to reconfigure on the module's execution context.
    */
  private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
    entry.executionContext.execute(new Runnable {
      override def run(): Unit =
        Try(entry.module.reconfigure(config))
          .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
    })
  }

  private def noopRegistration(moduleName: String): Registration = new Registration {
    override def cancel(): Unit =
      _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
  }

  private def registration(entry: Entry[Module]): Registration = new Registration {
    override def cancel(): Unit = stopModule(entry)
  }

  private def parseModuleKind(kind: String): Module.Kind = kind.toLowerCase match {
    case "combined" => Module.Kind.Combined
    case "metric"   => Module.Kind.Metric
    case "span"     => Module.Kind.Span
    case "plain"    => Module.Kind.Plain
  }

  private def readRegistrySettings(config: Config): Settings =
    Settings(
      metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
      optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
      traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
      traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
    )

  private case class Settings(
    metricTickInterval: Duration,
    optimisticMetricTickAlignment: Boolean,
    traceTickInterval: Duration,
    traceReporterQueueSize: Int
  )


  private case class Entry[T <: Module](
    name: String,
    executionContext: ExecutionContextExecutorService,
    programmaticallyAdded: Boolean,
    settings: Module.Settings,
    module: T
  )
}