aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2019-02-04 19:15:43 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2019-02-04 19:15:43 +0100
commit0a2b7f4bf0dde31c82482fbaf5153c22c84ada69 (patch)
tree799f857698132b937db6fc0c5207af83ebe62b15
parentff8b915260e90084179dcb1d8b0b7b5f62b98470 (diff)
downloadKamon-0a2b7f4bf0dde31c82482fbaf5153c22c84ada69.tar.gz
Kamon-0a2b7f4bf0dde31c82482fbaf5153c22c84ada69.tar.bz2
Kamon-0a2b7f4bf0dde31c82482fbaf5153c22c84ada69.zip
cleanup the exposition of status data and ensure the module registry provides enough info
-rw-r--r--build.sbt2
-rw-r--r--kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala12
-rw-r--r--kamon-core-tests/src/test/scala/kamon/module/ModuleRegistrySpec.scala (renamed from kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala)18
-rw-r--r--kamon-core/src/main/resources/reference.conf2
-rw-r--r--kamon-core/src/main/scala/kamon/ClassLoading.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala40
-rw-r--r--kamon-core/src/main/scala/kamon/ModuleLoading.scala5
-rw-r--r--kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/module/Module.scala465
-rw-r--r--kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala463
-rw-r--r--kamon-core/src/main/scala/kamon/module/ReportingModule.scala27
-rw-r--r--kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala29
-rw-r--r--kamon-core/src/main/scala/kamon/status/Status.scala36
-rw-r--r--kamon-core/src/main/scala/kamon/status/StatusPageServer.scala2
14 files changed, 623 insertions, 498 deletions
diff --git a/build.sbt b/build.sbt
index 27114730..b5b06ee5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -77,7 +77,7 @@ lazy val coreTests = (project in file("kamon-core-tests"))
.settings(
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.1" % "test",
- "ch.qos.logback" % "logback-classic" % "1.2.2" % "test"
+ "ch.qos.logback" % "logback-classic" % "1.2.3" % "test"
)
).dependsOn(testkit)
diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
index 2a0af8c0..9ee07694 100644
--- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
@@ -39,14 +39,14 @@ class KamonLifecycleSpec extends WordSpec with Matchers with Eventually {
}
}
-class DummyMetricReporter extends MetricReporter {
+class DummyMetricReporter extends kamon.module.MetricReporter {
override def start(): Unit = {}
override def stop(): Unit = {}
override def reconfigure(config: Config): Unit = {}
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {}
}
-class DummySpanReporter extends SpanReporter {
+class DummySpanReporter extends kamon.module.SpanReporter {
override def start(): Unit = {}
override def stop(): Unit = {}
override def reconfigure(config: Config): Unit = {}
@@ -54,13 +54,13 @@ class DummySpanReporter extends SpanReporter {
}
object KamonWithRunningReporter extends App {
- Kamon.addReporter(new DummyMetricReporter())
- Kamon.addReporter(new DummySpanReporter())
+ Kamon.registerModule("dummy metric reporter", new DummyMetricReporter())
+ Kamon.registerModule("dummy span reporter", new DummySpanReporter())
}
object KamonWithTemporaryReporter extends App {
- Kamon.addReporter(new DummyMetricReporter())
- Kamon.addReporter(new DummySpanReporter())
+ Kamon.registerModule("dummy metric reporter", new DummyMetricReporter())
+ Kamon.registerModule("dummy span repoter", new DummySpanReporter())
Thread.sleep(5000)
Kamon.stopAllReporters()
diff --git a/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala b/kamon-core-tests/src/test/scala/kamon/module/ModuleRegistrySpec.scala
index 515dfdd1..791015fb 100644
--- a/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/module/ModuleRegistrySpec.scala
@@ -14,15 +14,17 @@
*/
package kamon
+package module
import com.typesafe.config.Config
import kamon.metric.PeriodSnapshot
import kamon.testkit.Reconfigure
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+import kamon.{MetricReporter => LegacyMetricReporter}
-class ReporterRegistrySpec extends WordSpec with Matchers with Reconfigure with Eventually with BeforeAndAfterAll {
- "The ReporterRegistry" when {
+class ModuleRegistrySpec extends WordSpec with Matchers with Reconfigure with Eventually with BeforeAndAfterAll {
+ "The ModuleRegistry" when {
"working with metrics reporters" should {
"report all metrics if no filters are applied" in {
Kamon.counter("test.hello").increment()
@@ -104,13 +106,7 @@ class ReporterRegistrySpec extends WordSpec with Matchers with Reconfigure with
resetConfig()
}
- abstract class DummyReporter extends MetricReporter {
- override def start(): Unit = {}
- override def stop(): Unit = {}
- override def reconfigure(config: Config): Unit = {}
- }
-
- class SeenMetricsReporter extends DummyReporter {
+ class SeenMetricsReporter extends LegacyMetricReporter {
@volatile private var count = 0
@volatile private var seenMetrics = Seq.empty[String]
@@ -125,5 +121,9 @@ class ReporterRegistrySpec extends WordSpec with Matchers with Reconfigure with
def snapshotCount(): Int =
count
+
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def reconfigure(config: Config): Unit = {}
}
}
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 1d6a9f1b..dea7296d 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -44,6 +44,8 @@ kamon {
# kamon.modules {
# module-name {
# enabled = true
+ # description = "A module description"
+ # kind = "combined | metric | span | plain"
# class = "com.example.ModuleClass"
# }
# }
diff --git a/kamon-core/src/main/scala/kamon/ClassLoading.scala b/kamon-core/src/main/scala/kamon/ClassLoading.scala
index 5b097af1..54d1922c 100644
--- a/kamon-core/src/main/scala/kamon/ClassLoading.scala
+++ b/kamon-core/src/main/scala/kamon/ClassLoading.scala
@@ -23,4 +23,10 @@ trait ClassLoading {
def createInstance[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
_dynamicAccess.createInstanceFor(fqcn, args)
+
+ def createInstance[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
+ _dynamicAccess.createInstanceFor(clazz, args)
+
+ def resolveClass[T: ClassTag](fqcn: String): Try[Class[_ <: T]] =
+ _dynamicAccess.getClassFor(fqcn)
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 96aba81c..50c3e69d 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -15,9 +15,9 @@
package kamon
-import com.typesafe.config.{Config, ConfigRenderOptions}
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
import kamon.metric.PeriodSnapshot
-import kamon.module.{MetricReporter, Module}
+import kamon.trace.Span
object Kamon extends ClassLoading
with Configuration
@@ -42,16 +42,41 @@ object Kamon extends ClassLoading
object QuickTest extends App {
+ val manualConfig =
+ """
+ |kamon.modules {
+ | kamon-zipkin {
+ | enabled = false
+ | description = "Module that sends data to particular places"
+ | kind = metric
+ | class = kamon.MyCustomMetricDude
+ | }
+ |}
+ """.stripMargin
+
+ val newConfig = ConfigFactory.parseString(manualConfig).withFallback(Kamon.config())
+ Kamon.reconfigure(newConfig)
+
+
+
Kamon.loadModules()
- Kamon.registerModule("my-module", new MetricReporter {
+ Kamon.registerModule("my-module", new kamon.module.MetricReporter {
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {}
override def start(): Unit = {}
override def stop(): Unit = {}
override def reconfigure(newConfig: Config): Unit = {}
})
+ Kamon.registerModule("my-module-for-spans", new kamon.module.SpanReporter {
+ override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = {}
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def reconfigure(newConfig: Config): Unit = {}
+ })
+
- Kamon.histogram("test").refine("tagcito" -> "value").record(10)
+ Kamon.histogram("test").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").record(10)
+ Kamon.rangeSampler("test-rs").refine("actor_class" -> "com.kamon.something.MyActor", "system" -> "HRMS").increment(34)
Kamon.counter("test-counter").refine("tagcito" -> "value").increment(42)
//println("JSON CONFIG: " + Kamon.config().root().render(ConfigRenderOptions.concise().setFormatted(true).setJson(true)))
@@ -61,3 +86,10 @@ object QuickTest extends App {
}
+
+class MyCustomMetricDude extends kamon.module.MetricReporter {
+ override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {}
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def reconfigure(newConfig: Config): Unit = {}
+}
diff --git a/kamon-core/src/main/scala/kamon/ModuleLoading.scala b/kamon-core/src/main/scala/kamon/ModuleLoading.scala
index bc90c656..9501f277 100644
--- a/kamon-core/src/main/scala/kamon/ModuleLoading.scala
+++ b/kamon-core/src/main/scala/kamon/ModuleLoading.scala
@@ -3,6 +3,7 @@ package kamon
import com.typesafe.config.Config
import kamon.metric.{MetricsSnapshot, PeriodSnapshot}
import kamon.module.Module
+import kamon.module.ModuleRegistry
import kamon.util.Registration
import kamon.module.{MetricReporter => NewMetricReporter}
import kamon.module.{SpanReporter => NewSpanReporter}
@@ -30,13 +31,15 @@ trait SpanReporter extends kamon.module.SpanReporter { }
* kamon.modules {
* module-name {
* enabled = true
+ * description = "A module description"
+ * kind = "combined | metric | span | plain"
* class = "com.example.MyModule"
* }
* }
*
*/
trait ModuleLoading { self: ClassLoading with Configuration with Utilities with Metrics with Tracing =>
- protected val _moduleRegistry = new Module.Registry(self, self, clock(), self.metricRegistry(), self.tracer())
+ protected val _moduleRegistry = new ModuleRegistry(self, self, clock(), self.metricRegistry(), self.tracer())
self.onReconfigure(newConfig => self._moduleRegistry.reconfigure(newConfig))
diff --git a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
index 11c5653f..fd0cd8b8 100644
--- a/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/metric/MetricRegistry.scala
@@ -26,6 +26,7 @@ import scala.collection.concurrent.TrieMap
import java.time.Duration
import java.util.concurrent.ScheduledExecutorService
+import kamon.status.Status
import org.slf4j.LoggerFactory
@@ -89,24 +90,19 @@ class MetricRegistry(initialConfig: Config, scheduler: ScheduledExecutorService)
metric.asInstanceOf[T]
}
- private[kamon] def status(): MetricRegistry.Status = {
- var metricInfos = Seq.empty[MetricRegistry.MetricInfo]
+ private[kamon] def status(): Status.MetricRegistry = {
+ var registeredMetrics = Seq.empty[Status.Metric]
metrics.foreach {
case (metricName, metric) =>
metric.incarnations().foreach(incarnation => {
- metricInfos = metricInfos :+ MetricRegistry.MetricInfo(metricName, incarnation, metric.instrumentType)
+ registeredMetrics = registeredMetrics :+ Status.Metric(metricName, incarnation, metric.instrumentType)
})
}
- MetricRegistry.Status(metricInfos)
+ Status.MetricRegistry(registeredMetrics)
}
}
-object MetricRegistry {
- case class Status(metrics: Seq[MetricInfo])
- case class MetricInfo(name: String, tags: Map[String, String], instrumentType: InstrumentType)
-}
-
trait MetricsSnapshotGenerator {
def snapshot(): MetricsSnapshot
}
diff --git a/kamon-core/src/main/scala/kamon/module/Module.scala b/kamon-core/src/main/scala/kamon/module/Module.scala
index fd5af16c..592e02aa 100644
--- a/kamon-core/src/main/scala/kamon/module/Module.scala
+++ b/kamon-core/src/main/scala/kamon/module/Module.scala
@@ -1,20 +1,9 @@
package kamon
package module
-import java.time.{Duration, Instant}
-import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
-import java.util.concurrent.atomic.AtomicReference
-
import com.typesafe.config.Config
-import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
-import kamon.trace.Tracer.SpanBuffer
-import kamon.util.Clock
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters.collectionAsScalaIterableConverter
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
-import scala.util.Try
-import scala.util.control.NonFatal
+import kamon.metric.PeriodSnapshot
+import kamon.trace.Span
/**
* Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace
@@ -46,8 +35,39 @@ trait Module {
}
+/**
+ * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The
+ * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting.
+ */
+trait MetricReporter extends Module {
+ def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
+}
+
+/**
+ * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the
+ * span batches is controlled by the kamon.trace.tick-interval setting.
+ */
+trait SpanReporter extends Module {
+ def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
+}
+
+/**
+ * Modules implementing this trait will get registered for periodically receiving metric period snapshots and span
+ * batches.
+ */
+trait CombinedReporter extends MetricReporter with SpanReporter
+
+
object Module {
+ sealed trait Kind
+ object Kind {
+ case object Combined extends Kind
+ case object Metric extends Kind
+ case object Span extends Kind
+ case object Plain extends Kind
+ }
+
/**
* Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its
* registration.
@@ -61,415 +81,20 @@ object Module {
}
/**
- * Controls the lifecycle of all available modules.
+ * Configuration of a given module present in the classpath.
+ *
+ * @param name Module's name
+ * @param description Module's description.
+ * @param clazz The class implementing the configured module.
+ * @param kind Module kind.
+ * @param enabled Whether the module is enabled or not. Enabled modules in the classpath will be automatically
+ * started in any call to Kamon.loadModules().
*/
- class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
- private val _logger = LoggerFactory.getLogger(classOf[Registry])
- private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
- private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
-
- private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
- private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
-
- private var _registrySettings = readRegistryConfiguration(configuration.config())
- private var _registeredModules: Map[String, Entry[Module]] = Map.empty
- private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
- private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty
-
- // Start ticking as soon as the registry is created.
- scheduleMetricsTicker()
- scheduleSpansTicker()
-
-
- /**
- * Registers a module that has already been instantiated by the user. The start callback will be executed as part
- * of the registration process. If a module with the specified name already exists the registration will fail. If
- * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
- * and spans data upon every tick.
- *
- * @param name Desired module name.
- * @param module Module instance.
- * @return A registration that can be used to stop the module at any time.
- */
- def register(name: String, module: Module): Registration = synchronized {
- if(_registeredModules.get(name).isEmpty) {
- val moduleEntry = createEntry(name, true, module)
- startModule(moduleEntry)
- registration(moduleEntry)
-
- } else {
- _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
- noopRegistration(name)
- }
- }
-
- /**
- * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
- * configured modules state.
- */
- def load(config: Config): Unit = synchronized {
- val configuredModules = readModuleSettings(config)
- val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }
-
- // Start, reconfigure and stop modules that are still present but disabled.
- configuredModules.foreach { moduleSettings =>
- automaticallyRegisteredModules.get(moduleSettings.name).fold {
- // The module does not exist in the registry, the only possible action is starting it, if enabled.
- if(moduleSettings.enabled) {
- createModule(moduleSettings).foreach(entry => startModule(entry))
- }
-
- } { existentModuleSettings =>
- // When a module already exists it can either need to be stopped, or to be reconfigured.
- if(moduleSettings.enabled) {
- reconfigureModule(existentModuleSettings, config)
- } else {
- stopModule(existentModuleSettings)
- }
- }
- }
-
- // Remove all modules that no longer exist in the configuration.
- val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
- missingModules.foreach {
- case (_, entry) => stopModule(entry)
- }
- }
-
- /**
- * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
- * registry.
- */
- def reconfigure(newConfig: Config): Unit = synchronized {
- _registrySettings = readRegistryConfiguration(configuration.config())
- _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
- scheduleMetricsTicker()
- scheduleSpansTicker()
- }
-
- /**
- * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
- * spans available until the call to stop.
- */
- def stop(): Future[Unit] = synchronized {
- implicit val cleanupExecutor = ExecutionContext.Implicits.global
- scheduleMetricsTicker(once = true)
- scheduleSpansTicker(once = true)
-
- val stopSignals = _registeredModules.values.map(stopModule)
- val latch = new CountDownLatch(stopSignals.size)
- stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))
-
- // There is a global 30 seconds limit to shutdown after which all executors will shut down.
- val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
- stopCompletionFuture.onComplete(_ => {
- _metricsTickerExecutor.shutdown()
- _spansTickerExecutor.shutdown()
- })
-
- stopCompletionFuture.map(_ => ())
- }
-
-
- /**
- * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
- * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
- * cancelled and scheduled again.
- */
- private def scheduleMetricsTicker(once: Boolean = false): Unit = {
- val currentMetricsTicker = _metricsTickerSchedule.get()
- if(currentMetricsTicker != null)
- currentMetricsTicker.cancel(false)
-
- _metricsTickerSchedule.set {
- val interval = _registrySettings.metricTickInterval.toMillis
- val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
- val now = clock.instant()
- val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
- Duration.between(now, nextTick).toMillis
- } else _registrySettings.metricTickInterval.toMillis
-
- val ticker = new Runnable {
- var lastInstant = Instant.now(clock)
-
- override def run(): Unit = try {
- val currentInstant = Instant.now(clock)
- val periodSnapshot = PeriodSnapshot(
- from = lastInstant,
- to = currentInstant,
- metrics = snapshotGenerator.snapshot())
-
- metricReporterModules().foreach { entry =>
- Future {
- Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
- _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
- }
- }(entry.executionContext)
- }
-
- lastInstant = currentInstant
- } catch {
- case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
- }
- }
-
- if(once)
- _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
- else
- _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
- }
- }
-
- /**
- * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
- * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
- * cancelled and scheduled again.
- */
- private def scheduleSpansTicker(once: Boolean = false): Unit = {
- val currentSpansTicker = _spansTickerSchedule.get()
- if(currentSpansTicker != null)
- currentSpansTicker.cancel(false)
-
- _spansTickerSchedule.set {
- val interval = _registrySettings.traceTickInterval.toMillis
-
- val ticker = new Runnable {
- override def run(): Unit = try {
- val spanBatch = spanBuffer.flush()
-
- spanReporterModules().foreach { entry =>
- Future {
- Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
- _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
- }
- }(entry.executionContext)
- }
-
- } catch {
- case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
- }
- }
-
- if(once)
- _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
- else
- _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
- }
- }
-
- private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
- _metricReporterModules.values
- }
-
- private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
- _spanReporterModules.values
- }
-
- private def readModuleSettings(config: Config): Seq[Settings] = {
- val moduleConfigs = config.getConfig("kamon.modules").configurations
- val moduleSettings = moduleConfigs.map {
- case (moduleName, moduleConfig) =>
- val moduleSettings = Try {
- Settings(
- moduleName,
- moduleConfig.getString("class"),
- moduleConfig.getBoolean("enabled")
- )
- }
-
- moduleSettings.failed.foreach { t =>
- _logger.warn(s"Failed to read configuration for module [$moduleName]", t)
-
- if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
- _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
- }
- }
-
- moduleSettings
-
- } filter(_.isSuccess) map(_.get) toSeq
-
-
- // Load all modules that might have been configured using the legacy "kamon.reporters" setting from <1.2.0
- // versions. This little hack should be removed by the time we release 2.0.
- //
- if(config.hasPath("kamon.reporters")) {
- val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass =>
- Settings(moduleClass, moduleClass, true)
- } toSeq
-
- val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty)
- repeatedLegacyModules.foreach(m =>
- _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
-
- uniqueLegacyModules.foreach(m =>
- _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
-
- moduleSettings ++ uniqueLegacyModules
-
- } else moduleSettings
- }
-
- /**
- * Creates a module from the provided settings.
- */
- private def createModule(settings: Settings): Option[Entry[Module]] = {
- val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil)
- val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance))
-
- moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
- moduleEntry.toOption
- }
-
- private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance)
- }
-
-
- /**
- * Returns the current status of this module registry.
- */
- private[kamon] def status(): Registry.Status = {
- def moduleKind(instance: Any): String = instance match {
- case _: CombinedReporter => "combined"
- case _: MetricReporter => "metric"
- case _: SpanReporter => "span"
- case _: Module => "plain"
- }
-
- val automaticallyAddedModules = readModuleSettings(configuration.config()).map(moduleSettings => {
- val entry = _registeredModules.get(moduleSettings.name)
- val entryModuleKind = entry.map(e => moduleKind(e.module)).getOrElse("unknown")
- Registry.ModuleInfo(moduleSettings.name, moduleSettings.fqcn, moduleSettings.enabled, entry.nonEmpty, entryModuleKind)
- })
-
- val programmaticallyAddedModules = _registeredModules
- .filter { case (_, entry) => entry.programmaticallyAdded }
- .map { case (name, entry) => {
- Registry.ModuleInfo(name, entry.module.getClass.getName, true, true, moduleKind(entry.module))
- }}
-
- val allModules = automaticallyAddedModules ++ programmaticallyAddedModules
- Registry.Status(allModules)
- }
-
-
- /**
- * Registers a module and schedules execution of its start procedure.
- */
- private def startModule(entry: Entry[Module]): Unit = {
- registerModule(entry)
-
- // Schedule the start hook on the module
- entry.executionContext.execute(new Runnable {
- override def run(): Unit =
- Try(entry.module.start())
- .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
- })
- }
-
- private def registerModule(entry: Entry[Module]): Unit = {
- _registeredModules = _registeredModules + (entry.name -> entry)
- if(entry.module.isInstanceOf[MetricReporter])
- _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
- if(entry.module.isInstanceOf[SpanReporter])
- _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])
-
- }
-
- /**
- * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
- * context. The returned future completes when the module finishes its stop procedure.
- */
- private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
- val cleanupExecutor = ExecutionContext.Implicits.global
-
- // Remove the module from all registries
- _registeredModules = _registeredModules - entry.name
- if(entry.module.isInstanceOf[MetricReporter])
- _metricReporterModules = _metricReporterModules - entry.name
- if(entry.module.isInstanceOf[SpanReporter])
- _spanReporterModules = _spanReporterModules - entry.name
-
-
- // Schedule a call to stop on the module
- val stopPromise = Promise[Unit]()
- entry.executionContext.execute(new Runnable {
- override def run(): Unit =
- stopPromise.complete {
- val stopResult = Try(entry.module.stop())
- stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
- stopResult
- }
-
- })
-
- stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
- stopPromise.future
- }
-
- /**
- * Schedules a call to reconfigure on the module's execution context.
- */
- private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
- entry.executionContext.execute(new Runnable {
- override def run(): Unit =
- Try(entry.module.reconfigure(config))
- .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
- })
- }
-
- private def noopRegistration(moduleName: String): Registration = new Registration {
- override def cancel(): Unit =
- _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
- }
-
- private def registration(entry: Entry[Module]): Registration = new Registration {
- override def cancel(): Unit = stopModule(entry)
- }
- }
-
- object Registry {
-
- case class Status(
- modules: Seq[ModuleInfo]
- )
-
- case class ModuleInfo(
- name: String,
- description: String,
- enabled: Boolean,
- started: Boolean,
- kind: String
- )
- }
-
- private def readRegistryConfiguration(config: Config): RegistrySettings =
- RegistrySettings(
- metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
- optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
- traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
- traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
- )
-
- private case class RegistrySettings(
- metricTickInterval: Duration,
- optimisticMetricTickAlignment: Boolean,
- traceTickInterval: Duration,
- traceReporterQueueSize: Int
- )
-
- private case class Settings(
+ case class Settings(
name: String,
- fqcn: String,
+ description: String,
+ clazz: Class[_ <: Module],
+ kind: Module.Kind,
enabled: Boolean
)
-
- private case class Entry[T <: Module](
- name: String,
- executionContext: ExecutionContextExecutorService,
- programmaticallyAdded: Boolean,
- module: T
- )
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
new file mode 100644
index 00000000..5520c602
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/module/ModuleRegistry.scala
@@ -0,0 +1,463 @@
+package kamon
+package module
+
+import java.time.{Duration, Instant}
+import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
+
+import com.typesafe.config.Config
+import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
+import kamon.module.Module.Registration
+import kamon.status.Status
+import kamon.trace.Tracer.SpanBuffer
+import kamon.util.Clock
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
+import scala.util.Try
+import scala.util.control.NonFatal
+
+
+
+/**
+ * Controls the lifecycle of all available modules.
+ */
+class ModuleRegistry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
+ private val _logger = LoggerFactory.getLogger(classOf[ModuleRegistry])
+ private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
+ private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
+
+ private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+ private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+
+ private var _registrySettings = readRegistrySettings(configuration.config())
+ private var _registeredModules: Map[String, Entry[Module]] = Map.empty
+ private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
+ private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty
+
+ // Start ticking as soon as the registry is created.
+ scheduleMetricsTicker()
+ scheduleSpansTicker()
+
+
+ /**
+ * Registers a module that has already been instantiated by the user. The start callback will be executed as part
+ * of the registration process. If a module with the specified name already exists the registration will fail. If
+ * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
+ * and spans data upon every tick.
+ *
+ * @param name Desired module name.
+ * @param module Module instance.
+ * @return A registration that can be used to stop the module at any time.
+ */
+ def register(name: String, module: Module): Registration = synchronized {
+ if(_registeredModules.get(name).isEmpty) {
+ val inferredSettings = Module.Settings(
+ name,
+ module.getClass.getName,
+ module.getClass,
+ inferModuleKind(module.getClass),
+ true
+ )
+
+ val moduleEntry = createEntry(inferredSettings, true, module)
+ startModule(moduleEntry)
+ registration(moduleEntry)
+
+ } else {
+ _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
+ noopRegistration(name)
+ }
+ }
+
+ /**
+ * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
+ * configured modules state.
+ */
+ def load(config: Config): Unit = synchronized {
+ val configuredModules = readModuleSettings(config)
+ val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }
+
+ // Start, reconfigure and stop modules that are still present but disabled.
+ configuredModules.foreach { moduleSettings =>
+ automaticallyRegisteredModules.get(moduleSettings.name).fold {
+ // The module does not exist in the registry, the only possible action is starting it, if enabled.
+ if(moduleSettings.enabled) {
+ createModule(moduleSettings).foreach(entry => startModule(entry))
+ }
+
+ } { existentModuleSettings =>
+ // When a module already exists it can either need to be stopped, or to be reconfigured.
+ if(moduleSettings.enabled) {
+ reconfigureModule(existentModuleSettings, config)
+ } else {
+ stopModule(existentModuleSettings)
+ }
+ }
+ }
+
+ // Remove all modules that no longer exist in the configuration.
+ val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
+ missingModules.foreach {
+ case (_, entry) => stopModule(entry)
+ }
+ }
+
+ /**
+ * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
+ * registry.
+ */
+ def reconfigure(newConfig: Config): Unit = synchronized {
+ _registrySettings = readRegistrySettings(configuration.config())
+ _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
+ scheduleMetricsTicker()
+ scheduleSpansTicker()
+ }
+
+ /**
+ * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
+ * spans available until the call to stop.
+ */
+ def stop(): Future[Unit] = synchronized {
+ implicit val cleanupExecutor = ExecutionContext.Implicits.global
+ scheduleMetricsTicker(once = true)
+ scheduleSpansTicker(once = true)
+
+ val stopSignals = _registeredModules.values.map(stopModule)
+ val latch = new CountDownLatch(stopSignals.size)
+ stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))
+
+ // There is a global 30 seconds limit to shutdown after which all executors will shut down.
+ val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
+ stopCompletionFuture.onComplete(_ => {
+ _metricsTickerExecutor.shutdown()
+ _spansTickerExecutor.shutdown()
+ })
+
+ stopCompletionFuture.map(_ => ())
+ }
+
+
+ /**
+ * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
+ * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
+ * cancelled and scheduled again.
+ */
+ private def scheduleMetricsTicker(once: Boolean = false): Unit = {
+ val currentMetricsTicker = _metricsTickerSchedule.get()
+ if(currentMetricsTicker != null)
+ currentMetricsTicker.cancel(false)
+
+ _metricsTickerSchedule.set {
+ val interval = _registrySettings.metricTickInterval.toMillis
+ val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
+ val now = clock.instant()
+ val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
+ Duration.between(now, nextTick).toMillis
+ } else _registrySettings.metricTickInterval.toMillis
+
+ val ticker = new Runnable {
+ var lastInstant = Instant.now(clock)
+
+ override def run(): Unit = try {
+ val currentInstant = Instant.now(clock)
+ val periodSnapshot = PeriodSnapshot(
+ from = lastInstant,
+ to = currentInstant,
+ metrics = snapshotGenerator.snapshot())
+
+ metricReporterModules().foreach { entry =>
+ Future {
+ Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
+ _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
+ }
+ }(entry.executionContext)
+ }
+
+ lastInstant = currentInstant
+ } catch {
+ case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
+ }
+ }
+
+ if(once)
+ _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
+ else
+ _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ /**
+ * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
+ * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
+ * cancelled and scheduled again.
+ */
+ private def scheduleSpansTicker(once: Boolean = false): Unit = {
+ val currentSpansTicker = _spansTickerSchedule.get()
+ if(currentSpansTicker != null)
+ currentSpansTicker.cancel(false)
+
+ _spansTickerSchedule.set {
+ val interval = _registrySettings.traceTickInterval.toMillis
+
+ val ticker = new Runnable {
+ override def run(): Unit = try {
+ val spanBatch = spanBuffer.flush()
+
+ spanReporterModules().foreach { entry =>
+ Future {
+ Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
+ _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
+ }
+ }(entry.executionContext)
+ }
+
+ } catch {
+ case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
+ }
+ }
+
+ if(once)
+ _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
+ else
+ _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
+ _metricReporterModules.values
+ }
+
+ private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
+ _spanReporterModules.values
+ }
+
+ private def readModuleSettings(config: Config): Seq[Module.Settings] = {
+ val moduleConfigs = config.getConfig("kamon.modules").configurations
+ val moduleSettings = moduleConfigs.map {
+ case (moduleName, moduleConfig) =>
+ val moduleSettings = Try {
+ Module.Settings(
+ moduleName,
+ moduleConfig.getString("description"),
+ classLoading.resolveClass[Module](moduleConfig.getString("class")).get,
+ parseModuleKind(moduleConfig.getString("kind")),
+ moduleConfig.getBoolean("enabled")
+ )
+ }.map(ms => {
+ val inferredModuleKind = inferModuleKind(ms.clazz)
+ assert(inferredModuleKind == ms.kind,
+ s"Module [${ms.name}] is configured as [${ms.kind}] but the actual type does not comply to the expected interface.")
+ ms
+ })
+
+
+ moduleSettings.failed.foreach { t =>
+ _logger.warn(s"Failed to read configuration for module [$moduleName]", t)
+
+ if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
+ _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
+ }
+ }
+
+ moduleSettings
+
+ } filter(_.isSuccess) map(_.get) toSeq
+
+
+ // Load all modules that might have been configured using the legacy "kamon.reporters" setting from <1.2.0
+ // versions. This little hack should be removed by the time we release 2.0.
+ //
+ if(config.hasPath("kamon.reporters")) {
+ val legacyModuleSettings = config.getStringList("kamon.reporters").asScala
+ .map(moduleClass => {
+ val moduleSettings = Try {
+ val moduleClazz = classLoading.resolveClass[Module](moduleClass).get
+ val inferredModuleKind = inferModuleKind(moduleClazz)
+ val name = moduleClazz.getName()
+ val description = "Module detected from the legacy kamon.reporters configuration."
+
+ Module.Settings(name, description, moduleClazz, inferredModuleKind, true)
+ }
+
+ moduleSettings.failed.foreach(t => _logger.error(s"Failed to load legacy reporter module [${moduleClass}]", t))
+ moduleSettings
+ })
+ .filter(_.isSuccess)
+ .map(_.get)
+
+
+ val (repeatedLegacyModules, uniqueLegacyModules) = legacyModuleSettings
+ .partition(lm => moduleSettings.find(_.clazz.getName == lm.clazz.getName).nonEmpty)
+
+ repeatedLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
+
+ uniqueLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
+
+ moduleSettings ++ uniqueLegacyModules
+
+ } else moduleSettings
+ }
+
+ /**
+ * Creates a module from the provided settings.
+ */
+ private def createModule(settings: Module.Settings): Option[Entry[Module]] = {
+ val moduleInstance = classLoading.createInstance[Module](settings.clazz, Nil)
+ val moduleEntry = moduleInstance.map(instance => createEntry(settings, false, instance))
+
+ moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
+ moduleEntry.toOption
+ }
+
+ private def createEntry(settings: Module.Settings, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
+ val executor = Executors.newSingleThreadExecutor(threadFactory(settings.name))
+ Entry(settings.name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, settings, instance)
+ }
+
+ private def inferModuleKind(clazz: Class[_ <: Module]): Module.Kind = {
+ if(classOf[CombinedReporter].isAssignableFrom(clazz))
+ Module.Kind.Combined
+ else if(classOf[MetricReporter].isAssignableFrom(clazz))
+ Module.Kind.Metric
+ else if(classOf[SpanReporter].isAssignableFrom(clazz))
+ Module.Kind.Span
+ else
+ Module.Kind.Plain
+ }
+
+
+ /**
+ * Returns the current status of this module registry.
+ */
+ private[kamon] def status(): Status.ModuleRegistry = {
+ val automaticallyAddedModules = readModuleSettings(configuration.config()).map(moduleSettings => {
+ _registeredModules.get(moduleSettings.name)
+ .map(moduleEntry =>
+ // The module is on the classpath and started.
+ Status.Module(moduleEntry.name, moduleEntry.settings.description, moduleEntry.settings.kind, false, true)
+
+ ).getOrElse(
+ // The module is on the classpath but has not been started.
+ Status.Module(moduleSettings.name, moduleSettings.description, moduleSettings.kind, false, false)
+ )
+ })
+
+ val programmaticallyAddedModules = _registeredModules
+ .filter { case (_, entry) => entry.programmaticallyAdded }
+ .map { case (name, entry) => Status.Module(name, entry.settings.description, entry.settings.kind, true, true) }
+
+ val allModules = automaticallyAddedModules ++ programmaticallyAddedModules
+ Status.ModuleRegistry(allModules)
+ }
+
+
+ /**
+ * Registers a module and schedules execution of its start procedure.
+ */
+ private def startModule(entry: Entry[Module]): Unit = {
+ registerModule(entry)
+
+ // Schedule the start hook on the module
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ Try(entry.module.start())
+ .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
+ })
+ }
+
+ private def registerModule(entry: Entry[Module]): Unit = {
+ _registeredModules = _registeredModules + (entry.name -> entry)
+ if(entry.module.isInstanceOf[MetricReporter])
+ _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
+ if(entry.module.isInstanceOf[SpanReporter])
+ _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])
+
+ }
+
+ /**
+ * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
+ * context. The returned future completes when the module finishes its stop procedure.
+ */
+ private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
+ val cleanupExecutor = ExecutionContext.Implicits.global
+
+ // Remove the module from all registries
+ _registeredModules = _registeredModules - entry.name
+ if(entry.module.isInstanceOf[MetricReporter])
+ _metricReporterModules = _metricReporterModules - entry.name
+ if(entry.module.isInstanceOf[SpanReporter])
+ _spanReporterModules = _spanReporterModules - entry.name
+
+
+ // Schedule a call to stop on the module
+ val stopPromise = Promise[Unit]()
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ stopPromise.complete {
+ val stopResult = Try(entry.module.stop())
+ stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
+ stopResult
+ }
+
+ })
+
+ stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
+ stopPromise.future
+ }
+
+ /**
+ * Schedules a call to reconfigure on the module's execution context.
+ */
+ private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ Try(entry.module.reconfigure(config))
+ .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
+ })
+ }
+
+ private def noopRegistration(moduleName: String): Registration = new Registration {
+ override def cancel(): Unit =
+ _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
+ }
+
+ private def registration(entry: Entry[Module]): Registration = new Registration {
+ override def cancel(): Unit = stopModule(entry)
+ }
+
+ private def parseModuleKind(kind: String): Module.Kind = kind.toLowerCase match {
+ case "combined" => Module.Kind.Combined
+ case "metric" => Module.Kind.Metric
+ case "span" => Module.Kind.Span
+ case "plain" => Module.Kind.Plain
+ }
+
+ private def readRegistrySettings(config: Config): Settings =
+ Settings(
+ metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
+ optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
+ traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
+ traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
+ )
+
+ private case class Settings(
+ metricTickInterval: Duration,
+ optimisticMetricTickAlignment: Boolean,
+ traceTickInterval: Duration,
+ traceReporterQueueSize: Int
+ )
+
+
+ private case class Entry[T <: Module](
+ name: String,
+ executionContext: ExecutionContextExecutorService,
+ programmaticallyAdded: Boolean,
+ settings: Module.Settings,
+ module: T
+ )
+}
+
diff --git a/kamon-core/src/main/scala/kamon/module/ReportingModule.scala b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala
deleted file mode 100644
index 02a7344e..00000000
--- a/kamon-core/src/main/scala/kamon/module/ReportingModule.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-package kamon
-package module
-
-import kamon.trace.Span
-import kamon.metric.PeriodSnapshot
-
-/**
- * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The
- * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting.
- */
-trait MetricReporter extends Module {
- def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
-}
-
-/**
- * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the
- * span batches is controlled by the kamon.trace.tick-interval setting.
- */
-trait SpanReporter extends Module {
- def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
-}
-
-/**
- * Modules implementing this trait will get registered for periodically receiving metric period snapshots and span
- * batches.
- */
-trait CombinedReporter extends MetricReporter with SpanReporter
diff --git a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala
index 345e5cad..5a3f22dc 100644
--- a/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala
+++ b/kamon-core/src/main/scala/kamon/status/JsonMarshalling.scala
@@ -1,12 +1,10 @@
package kamon.status
import com.grack.nanojson.JsonWriter
-import kamon.module.Module
-import kamon.module.Module.Registry
import java.lang.{StringBuilder => JavaStringBuilder}
import com.typesafe.config.ConfigRenderOptions
-import kamon.metric.MetricRegistry
+import kamon.module.Module
trait JsonMarshalling[T] {
@@ -20,8 +18,15 @@ trait JsonMarshalling[T] {
object JsonMarshalling {
- implicit object ModuleRegistryStatusJsonMarshalling extends JsonMarshalling[Module.Registry.Status] {
- override def toJson(instance: Registry.Status, builder: JavaStringBuilder): Unit = {
+ implicit object ModuleRegistryStatusJsonMarshalling extends JsonMarshalling[Status.ModuleRegistry] {
+ override def toJson(instance: Status.ModuleRegistry, builder: JavaStringBuilder): Unit = {
+ def moduleKindString(moduleKind: Module.Kind): String = moduleKind match {
+ case Module.Kind.Combined => "combined"
+ case Module.Kind.Metric => "metric"
+ case Module.Kind.Span => "span"
+ case Module.Kind.Plain => "plain"
+ }
+
val array = JsonWriter.on(builder)
.`object`()
.array("modules")
@@ -30,9 +35,9 @@ object JsonMarshalling {
array.`object`()
.value("name", m.name)
.value("description", m.description)
- .value("kind", m.kind)
- .value("enabled", m.enabled)
- .value("started", m.started)
+ .value("kind", moduleKindString(m.kind))
+ .value("isProgrammaticallyRegistered", m.isProgrammaticallyRegistered)
+ .value("isStarted", m.isStarted)
.end()
})
@@ -40,8 +45,8 @@ object JsonMarshalling {
}
}
- implicit object BaseInfoJsonMarshalling extends JsonMarshalling[Status.BaseInfo] {
- override def toJson(instance: Status.BaseInfo, builder: JavaStringBuilder): Unit = {
+ implicit object BaseInfoJsonMarshalling extends JsonMarshalling[Status.Settings] {
+ override def toJson(instance: Status.Settings, builder: JavaStringBuilder): Unit = {
val baseConfigJson = JsonWriter.on(builder)
.`object`()
.value("version", instance.version)
@@ -65,8 +70,8 @@ object JsonMarshalling {
}
}
- implicit object MetricRegistryStatusJsonMarshalling extends JsonMarshalling[MetricRegistry.Status] {
- override def toJson(instance: MetricRegistry.Status, builder: JavaStringBuilder): Unit = {
+ implicit object MetricRegistryStatusJsonMarshalling extends JsonMarshalling[Status.MetricRegistry] {
+ override def toJson(instance: Status.MetricRegistry, builder: JavaStringBuilder): Unit = {
val metricsObject = JsonWriter.on(builder)
.`object`
.array("metrics")
diff --git a/kamon-core/src/main/scala/kamon/status/Status.scala b/kamon-core/src/main/scala/kamon/status/Status.scala
index 0d141ed0..dc059277 100644
--- a/kamon-core/src/main/scala/kamon/status/Status.scala
+++ b/kamon-core/src/main/scala/kamon/status/Status.scala
@@ -1,26 +1,27 @@
package kamon.status
import com.typesafe.config.Config
+import kamon.metric.InstrumentFactory.InstrumentType
import kamon.metric.MetricRegistry
import kamon.{Configuration, Environment, Kamon}
-import kamon.module.Module
-
+import kamon.module.ModuleRegistry
+import kamon.module.Module.{Kind => ModuleKind}
/**
* Allows accessing of component's status APIs without exposing any other internal API from those components.
*/
-class Status(_moduleRegistry: Module.Registry, _metricRegistry: MetricRegistry, configuration: Configuration) {
+class Status(_moduleRegistry: ModuleRegistry, _metricRegistry: MetricRegistry, configuration: Configuration) {
- def baseInfo(): Status.BaseInfo =
- Status.BaseInfo(BuildInfo.version, Kamon.environment, configuration.config())
+ def settings(): Status.Settings =
+ Status.Settings(BuildInfo.version, Kamon.environment, configuration.config())
/**
* Information about what modules have been detected in the classpath and their current status.
*/
- def moduleRegistry(): Module.Registry.Status =
+ def moduleRegistry(): Status.ModuleRegistry =
_moduleRegistry.status()
- def metricRegistry(): MetricRegistry.Status =
+ def metricRegistry(): Status.MetricRegistry =
_metricRegistry.status()
}
@@ -28,13 +29,32 @@ class Status(_moduleRegistry: Module.Registry, _metricRegistry: MetricRegistry,
object Status {
- case class BaseInfo(
+ case class Settings(
version: String,
environment: Environment,
config: Config
)
+ case class ModuleRegistry(
+ modules: Seq[Module]
+ )
+
+ case class Module(
+ name: String,
+ description: String,
+ kind: ModuleKind,
+ isProgrammaticallyRegistered: Boolean,
+ isStarted: Boolean
+ )
+ case class MetricRegistry(
+ metrics: Seq[Metric]
+ )
+ case class Metric(
+ name: String,
+ tags: Map[String, String],
+ instrumentType: InstrumentType
+ )
}
diff --git a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala
index 7a124c1b..35273f39 100644
--- a/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala
+++ b/kamon-core/src/main/scala/kamon/status/StatusPageServer.scala
@@ -17,7 +17,7 @@ class StatusPageServer(hostname: String, port: Int, resourceLoader: ClassLoader,
// Serve the current status data on Json.
session.getUri() match {
- case "/status/config" => json(status.baseInfo())
+ case "/status/settings" => json(status.settings())
case "/status/modules" => json(status.moduleRegistry())
case "/status/metrics" => json(status.metricRegistry())
case _ => NotFound