diff options
15 files changed, 790 insertions, 552 deletions
diff --git a/build.sbt b/build.sbt
index 131626d9..04e20f49 100644
--- a/build.sbt
+++ b/build.sbt
@@ -46,8 +46,9 @@ lazy val core = (project in file("kamon-core"))
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.3.1",
- "org.slf4j" % "slf4j-api" % "1.7.25",
- "org.hdrhistogram" % "HdrHistogram" % "2.1.9"
+ "org.hdrhistogram" % "HdrHistogram" % "2.1.9",
+ "org.jctools" % "jctools-core" % "2.1.1",
+ "org.slf4j" % "slf4j-api" % "1.7.25"
diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
index b5d0425b..2a0af8c0 100644
--- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
@@ -10,17 +10,17 @@ import org.scalatest.{Matchers, WordSpec}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._
-class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{
+class KamonLifecycleSpec extends WordSpec with Matchers with Eventually {
"the Kamon lifecycle" should {
- "keep the JVM running if reporters are running" in {
+ "keep the JVM running if modules are running" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter"))
process.isAlive shouldBe true
process.destroyForcibly().waitFor(5, TimeUnit.SECONDS)
- "let the JVM stop after all reporters are stopped" in {
+ "let the JVM stop after all modules are stopped" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithTemporaryReporter"))
process.isAlive shouldBe true
diff --git a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala
index f325d1d1..b52303f6 100644
--- a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala
@@ -29,7 +29,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
"the Kamon tracer" should {
"construct a minimal Span that only has a operation name" in {
- val span = tracer.buildSpan("myOperation").start()
+ val span = Kamon.buildSpan("myOperation").start()
val spanData = inspect(span)
spanData.operationName() shouldBe "myOperation"
@@ -38,7 +38,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
"pass the operation name and tags to started Span" in {
- val span = tracer.buildSpan("myOperation")
+ val span = Kamon.buildSpan("myOperation")
.withMetricTag("metric-tag", "value")
.withMetricTag("metric-tag", "value")
.withTag("hello", "world")
@@ -60,16 +60,16 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
"not have any parent Span if there is no Span in the current context and no parent was explicitly given" in {
- val span = tracer.buildSpan("myOperation").start()
+ val span = Kamon.buildSpan("myOperation").start()
val spanData = inspect(span)
spanData.context().parentID shouldBe IdentityProvider.NoIdentifier
"automatically take the Span from the current Context as parent" in {
- val parent = tracer.buildSpan("myOperation").start()
+ val parent = Kamon.buildSpan("myOperation").start()
val child = Kamon.withSpan(parent) {
- tracer.buildSpan("childOperation").asChildOf(parent).start()
+ Kamon.buildSpan("childOperation").asChildOf(parent).start()
val parentData = inspect(parent)
@@ -78,9 +78,9 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
"ignore the span from the current context as parent if explicitly requested" in {
- val parent = tracer.buildSpan("myOperation").start()
+ val parent = Kamon.buildSpan("myOperation").start()
val child = Kamon.withSpan(parent) {
- tracer.buildSpan("childOperation").ignoreParentFromContext().start()
+ Kamon.buildSpan("childOperation").ignoreParentFromContext().start()
val childData = inspect(child)
@@ -88,7 +88,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
"allow overriding the start timestamp for a Span" in {
- val span = tracer.buildSpan("myOperation").withFrom(Instant.EPOCH.plusMillis(321)).start()
+ val span = Kamon.buildSpan("myOperation").withFrom(Instant.EPOCH.plusMillis(321)).start()
val spanData = inspect(span)
spanData.from() shouldBe Instant.EPOCH.plusMillis(321)
@@ -101,7 +101,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
val remoteParent = Span.Remote(createSpanContext())
- val childData = inspect(tracer.buildSpan("local").asChildOf(remoteParent).start())
+ val childData = inspect(Kamon.buildSpan("local").asChildOf(remoteParent).start())
childData.context().traceID shouldBe remoteParent.context.traceID
childData.context().parentID shouldBe remoteParent.context.parentID
@@ -114,10 +114,10 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
val sampledRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.Sample))
val notSampledRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.DoNotSample))
- tracer.buildSpan("childOfSampled").asChildOf(sampledRemoteParent).start().context()
+ Kamon.buildSpan("childOfSampled").asChildOf(sampledRemoteParent).start().context()
.samplingDecision shouldBe(SamplingDecision.Sample)
- tracer.buildSpan("childOfNotSampled").asChildOf(notSampledRemoteParent).start().context()
+ Kamon.buildSpan("childOfNotSampled").asChildOf(notSampledRemoteParent).start().context()
.samplingDecision shouldBe(SamplingDecision.DoNotSample)
@@ -129,7 +129,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
val unknownSamplingRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.Unknown))
- tracer.buildSpan("childOfSampled").asChildOf(unknownSamplingRemoteParent).start().context()
+ Kamon.buildSpan("childOfSampled").asChildOf(unknownSamplingRemoteParent).start().context()
.samplingDecision shouldBe(SamplingDecision.Sample)
@@ -137,6 +137,4 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
- val tracer: Tracer = Kamon
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 2fa3d33b..0fb3ce0a 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -25,11 +25,22 @@ kamon {
- # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadReportersFromConfig()`. All
- # reporter classes must have a default constructor. No metric filtering is applied to metric reporters started this way.
+ # Modules that can be automatically discovered and started by Kamon. The configuration for each module has the
+ # following schema:
+ #
+ # kamon.modules {
+ # module-name {
+ # enabled = true
+ # class = "com.example.ModuleClass"
+ # }
+ # }
+ #
+ # All available modules in the classpath are started when calling Kamon.loadModules() and stopped when calling
+ # Kamon.stopModules().
+ #
+ modules {
- # Example: `reporters = ["kamon.prometheus.PrometheusReporter", "kamon.zipkin.ZipkinReporter"]`.
- reporters = [ ]
+ }
# Pool size for the executor service that will run sampling on RangeSampler instruments. This scheduler is accesible
# through Kamon.scheduler()
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index a6cc7bf4..ab95d773 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -15,113 +15,22 @@
package kamon
-import java.time.Duration
-import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
+object Kamon extends ClassLoading
+ with Configuration
+ with Utilities
+ with Metrics
+ with Tracing
+ with ModuleLoading
+ with ContextPropagation
+ with ContextStorage {
-import com.typesafe.config.{Config, ConfigFactory}
-import kamon.metric._
-import kamon.trace._
-import kamon.util.{Clock, Filters, Matcher, Registration}
-import org.slf4j.LoggerFactory
-import scala.concurrent.Future
-import scala.util.Try
-object Kamon extends MetricLookup with ClassLoading with Configuration with ReporterRegistry with Tracer with ContextPropagation with ContextStorage {
- private val logger = LoggerFactory.getLogger("kamon.Kamon")
- @volatile private var _config = ConfigFactory.load()
- @volatile private var _environment = Environment.fromConfig(_config)
- @volatile private var _filters = Filters.fromConfig(_config)
- private val _clock = new Clock.Default()
- private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler", daemon = true))
- private val _metrics = new MetricRegistry(_config, _scheduler)
- private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
- private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
- //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
- sys.addShutdownHook(() => _scheduler.shutdown())
+ @volatile private var _environment = Environment.fromConfig(config())
def environment: Environment =
onReconfigure(newConfig => {
- _config = config
_environment = Environment.fromConfig(config)
- _filters = Filters.fromConfig(config)
- _metrics.reconfigure(config)
- _reporterRegistry.reconfigure(config)
- _tracer.reconfigure(config)
- _scheduler match {
- case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
- case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other)
- }
- override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric =
- _metrics.histogram(name, unit, dynamicRange)
- override def counter(name: String, unit: MeasurementUnit): CounterMetric =
- _metrics.counter(name, unit)
- override def gauge(name: String, unit: MeasurementUnit): GaugeMetric =
- _metrics.gauge(name, unit)
- override def rangeSampler(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration],
- dynamicRange: Option[DynamicRange]): RangeSamplerMetric =
- _metrics.rangeSampler(name, unit, dynamicRange, sampleInterval)
- override def timer(name: String, dynamicRange: Option[DynamicRange]): TimerMetric =
- _metrics.timer(name, dynamicRange)
- def tracer: Tracer =
- _tracer
- override def buildSpan(operationName: String): Tracer.SpanBuilder =
- _tracer.buildSpan(operationName)
- override def identityProvider: IdentityProvider =
- _tracer.identityProvider
- override def loadReportersFromConfig(): Unit =
- _reporterRegistry.loadReportersFromConfig()
- override def addReporter(reporter: MetricReporter): Registration =
- _reporterRegistry.addReporter(reporter)
- override def addReporter(reporter: MetricReporter, name: String): Registration =
- _reporterRegistry.addReporter(reporter, name)
- override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
- _reporterRegistry.addReporter(reporter, name, filter)
- override def addReporter(reporter: SpanReporter): Registration =
- _reporterRegistry.addReporter(reporter)
- override def addReporter(reporter: SpanReporter, name: String): Registration =
- _reporterRegistry.addReporter(reporter, name)
- override def stopAllReporters(): Future[Unit] =
- _reporterRegistry.stopAllReporters()
- def filter(filterName: String, pattern: String): Boolean =
- _filters.accept(filterName, pattern)
- def filter(filterName: String): Matcher =
- _filters.get(filterName)
- def clock(): Clock =
- _clock
- def scheduler(): ScheduledExecutorService =
- _scheduler
- private def schedulerPoolSize(config: Config): Int =
- config.getInt("kamon.scheduler-pool-size")
diff --git a/kamon-core/src/main/scala/kamon/Metrics.scala b/kamon-core/src/main/scala/kamon/Metrics.scala
new file mode 100644
index 00000000..a2b74c20
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Metrics.scala
@@ -0,0 +1,31 @@
+package kamon
+import java.time.Duration
+import kamon.metric._
+trait Metrics extends MetricLookup { self: Configuration with Utilities =>
+ private val _metricsRegistry = new MetricRegistry(self.config(), self.scheduler())
+ override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric =
+ _metricsRegistry.histogram(name, unit, dynamicRange)
+ override def counter(name: String, unit: MeasurementUnit): CounterMetric =
+ _metricsRegistry.counter(name, unit)
+ override def gauge(name: String, unit: MeasurementUnit): GaugeMetric =
+ _metricsRegistry.gauge(name, unit)
+ override def rangeSampler(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration],
+ dynamicRange: Option[DynamicRange]): RangeSamplerMetric =
+ _metricsRegistry.rangeSampler(name, unit, dynamicRange, sampleInterval)
+ override def timer(name: String, dynamicRange: Option[DynamicRange]): TimerMetric =
+ _metricsRegistry.timer(name, dynamicRange)
+ protected def metricRegistry(): MetricRegistry =
+ _metricsRegistry
diff --git a/kamon-core/src/main/scala/kamon/ModuleLoading.scala b/kamon-core/src/main/scala/kamon/ModuleLoading.scala
new file mode 100644
index 00000000..8fe035d6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ModuleLoading.scala
@@ -0,0 +1,139 @@
+package kamon
+import com.typesafe.config.Config
+import kamon.metric.{MetricsSnapshot, PeriodSnapshot}
+import kamon.module.Module
+import kamon.util.Registration
+import kamon.module.{MetricReporter => NewMetricReporter}
+import kamon.module.{SpanReporter => NewSpanReporter}
+import kamon.module.Module.{Registration => NewRegistration}
+import kamon.trace.Span
+import scala.concurrent.Future
+@deprecated("Use kamon.module.Module instead", "1.2.0")
+sealed trait Reporter extends Module { }
+@deprecated("Use kamon.module.MetricReporter instead", "1.2.0")
+trait MetricReporter extends kamon.module.MetricReporter { }
+@deprecated("Use kamon.module.SpanReporter instead", "1.2.0")
+trait SpanReporter extends kamon.module.SpanReporter { }
+ * Handles the lifecycle of all modules known by Kamon. The most common implementations of modules are metrics and
+ * span reporters, but modules can be used to encapsulate any process that should be started automatically by Kamon and
+ * stopped when all modules are stopped (usually during shutdown).
+ *
+ * Modules can be automatically discovered from the kamon.modules configuration key, using the following schema:
+ *
+ * kamon.modules {
+ * module-name {
+ * enabled = true
+ * class = "com.example.MyModule"
+ * }
+ * }
+ *
+ */
+trait ModuleLoading { self: ClassLoading with Configuration with Utilities with Metrics with Tracing =>
+ private val _moduleRegistry = new Module.Registry(self, self, clock(), self.metricRegistry(), self.tracer())
+ self.onReconfigure(newConfig => self._moduleRegistry.reconfigure(newConfig))
+ /**
+ * Register a module instantiated by the user.
+ *
+ * @param name Module name. Registration will fail if a module with the given name already exists.
+ * @param module The module instance.
+ * @return A Registration that can be used to de-register the module at any time.
+ */
+ def registerModule(name: String, module: Module): NewRegistration =
+ _moduleRegistry.register(name, module)
+ /**
+ * Loads modules from Kamon's configuration.
+ */
+ def loadModules(): Unit =
+ _moduleRegistry.load(self.config())
+ /**
+ * Stops all registered modules. This includes automatically and programmatically registered modules.
+ *
+ * @return A future that completes when the stop callback on all available modules have been completed.
+ */
+ def stopModules(): Future[Unit] =
+ _moduleRegistry.stop()
+ // Compatibility with Kamon <1.2.0
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: MetricReporter): Registration =
+ wrapRegistration(_moduleRegistry.register(reporter.getClass.getName(), wrapLegacyMetricReporter(reporter)))
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: MetricReporter, name: String): Registration =
+ wrapRegistration(_moduleRegistry.register(name, wrapLegacyMetricReporter(reporter)))
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
+ wrapRegistration(_moduleRegistry.register(name, wrapLegacyMetricReporter(reporter, Some(filter))))
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: SpanReporter): Registration =
+ wrapRegistration(_moduleRegistry.register(reporter.getClass.getName(), wrapLegacySpanReporter(reporter)))
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: SpanReporter, name: String): Registration =
+ wrapRegistration(_moduleRegistry.register(name, wrapLegacySpanReporter(reporter)))
+ @deprecated("Use stopModules instead", "1.2.0")
+ def stopAllReporters(): Future[Unit] =
+ _moduleRegistry.stop()
+ @deprecated("Use loadModules instead", "1.2.0")
+ def loadReportersFromConfig(): Unit =
+ loadModules()
+ private def wrapRegistration(registration: NewRegistration): Registration = new Registration {
+ override def cancel(): Boolean = {
+ registration.cancel()
+ true
+ }
+ }
+ private def wrapLegacyMetricReporter(reporter: MetricReporter, filter: Option[String] = None): NewMetricReporter = new NewMetricReporter {
+ override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
+ val filteredSnapshot = filter
+ .map(f => filterMetrics(f, snapshot))
+ .getOrElse(snapshot)
+ reporter.reportPeriodSnapshot(filteredSnapshot)
+ }
+ private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = {
+ val metricFilter = Kamon.filter(filterName)
+ val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name))
+ val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name))
+ val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name))
+ val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name))
+ periodSnapshot.copy(metrics = MetricsSnapshot(
+ histograms, rangeSamplers, gauges, counters
+ ))
+ }
+ override def start(): Unit = reporter.start()
+ override def stop(): Unit = reporter.stop()
+ override def reconfigure(config: Config): Unit = reporter.reconfigure(config)
+ }
+ private def wrapLegacySpanReporter(reporter: SpanReporter): NewSpanReporter = new NewSpanReporter {
+ override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = reporter.reportSpans(spans)
+ override def start(): Unit = reporter.start()
+ override def stop(): Unit = reporter.stop()
+ override def reconfigure(newConfig: Config): Unit = reporter.reconfigure(newConfig)
+ }
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
deleted file mode 100644
index 06b9761a..00000000
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ /dev/null
@@ -1,405 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-package kamon
-import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
-import java.util.concurrent._
-import com.typesafe.config.Config
-import kamon.metric._
-import kamon.trace.Span
-import kamon.trace.Span.FinishedSpan
-import kamon.util.{Clock, DynamicAccess, Registration}
-import org.slf4j.LoggerFactory
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
-import scala.util.Try
-import scala.util.control.NonFatal
-import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.duration._
-sealed trait Reporter {
- def start(): Unit
- def stop(): Unit
- def reconfigure(config: Config): Unit
-trait MetricReporter extends Reporter {
- def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
-trait SpanReporter extends Reporter {
- def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
-trait ReporterRegistry {
- def loadReportersFromConfig(): Unit
- def addReporter(reporter: MetricReporter): Registration
- def addReporter(reporter: MetricReporter, name: String): Registration
- def addReporter(reporter: MetricReporter, name: String, filter: String): Registration
- def addReporter(reporter: SpanReporter): Registration
- def addReporter(reporter: SpanReporter, name: String): Registration
- def stopAllReporters(): Future[Unit]
-object ReporterRegistry {
- private[kamon] trait SpanSink {
- def reportSpan(finishedSpan: FinishedSpan): Unit
- }
- private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink {
- private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
- private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry", daemon = true))
- private val reporterCounter = new AtomicLong(0L)
- private var registryConfiguration = readRegistryConfiguration(initialConfig)
- private val metricReporters = TrieMap[Long, MetricReporterEntry]()
- private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
- private val spanReporters = TrieMap[Long, SpanReporterEntry]()
- private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
- reconfigure(initialConfig)
- override def loadReportersFromConfig(): Unit = {
- if(registryConfiguration.configuredReporters.isEmpty)
- logger.info("The kamon.reporters setting is empty, no reporters have been started.")
- else {
- registryConfiguration.configuredReporters.foreach { reporterFQCN =>
- val dynamicAccess = new DynamicAccess(getClass.getClassLoader)
- dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({
- case mr: MetricReporter =>
- addMetricReporter(mr, "loaded-from-config: " + reporterFQCN)
- logger.info("Loaded metric reporter [{}]", reporterFQCN)
- case sr: SpanReporter =>
- addSpanReporter(sr, "loaded-from-config: " + reporterFQCN)
- logger.info("Loaded span reporter [{}]", reporterFQCN)
- }).failed.foreach {
- t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t)
- }
- }
- }
- }
- override def addReporter(reporter: MetricReporter): Registration =
- addMetricReporter(reporter, reporter.getClass.getName())
- override def addReporter(reporter: MetricReporter, name: String): Registration =
- addMetricReporter(reporter, name)
- override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
- addMetricReporter(reporter, name, Some(filter))
- override def addReporter(reporter: SpanReporter): Registration =
- addSpanReporter(reporter, reporter.getClass.getName())
- override def addReporter(reporter: SpanReporter, name: String): Registration =
- addSpanReporter(reporter, name)
- private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = new MetricReporterEntry(
- id = reporterCounter.getAndIncrement(),
- name = name,
- reporter = reporter,
- filter = filter,
- executionContext = ExecutionContext.fromExecutorService(executor)
- )
- Future {
- Try {
- reporterEntry.reporter.start()
- }.failed.foreach { error =>
- logger.error(s"Metric reporter [$name] failed to start.", error)
- }
- }(reporterEntry.executionContext)
- if(metricReporters.isEmpty)
- reStartMetricTicker()
- metricReporters.put(reporterEntry.id, reporterEntry)
- createRegistration(reporterEntry.id, metricReporters)
- }
- private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = new SpanReporterEntry(
- id = reporterCounter.incrementAndGet(),
- name = name,
- reporter = reporter,
- bufferCapacity = registryConfiguration.traceReporterQueueSize,
- executionContext = ExecutionContext.fromExecutorService(executor)
- )
- Future {
- Try {
- reporterEntry.reporter.start()
- }.failed.foreach { error =>
- logger.error(s"Span reporter [$name] failed to start.", error)
- }
- }(reporterEntry.executionContext)
- if(spanReporters.isEmpty)
- reStartTraceTicker()
- spanReporters.put(reporterEntry.id, reporterEntry)
- createRegistration(reporterEntry.id, spanReporters)
- }
- private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration {
- override def cancel(): Boolean =
- target.remove(id).nonEmpty
- }
- override def stopAllReporters(): Future[Unit] = {
- implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
- val reporterStopFutures = Vector.newBuilder[Future[Unit]]
- while(metricReporters.nonEmpty) {
- val (idToRemove, _) = metricReporters.head
- metricReporters.remove(idToRemove).foreach { entry =>
- reporterStopFutures += stopMetricReporter(entry)
- }
- }
- while(spanReporters.nonEmpty) {
- val (idToRemove, _) = spanReporters.head
- spanReporters.remove(idToRemove).foreach { entry =>
- reporterStopFutures += stopSpanReporter(entry)
- }
- }
- Future.sequence(reporterStopFutures.result()).map(_ => ())
- }
- private[kamon] def reconfigure(config: Config): Unit = synchronized {
- val oldConfig = registryConfiguration
- registryConfiguration = readRegistryConfiguration(config)
- if(oldConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty)
- reStartMetricTicker()
- if(oldConfig.traceTickInterval != registryConfiguration.traceTickInterval && spanReporters.nonEmpty)
- reStartTraceTicker()
- // Reconfigure all registered reporters
- metricReporters.foreach {
- case (_, entry) =>
- Future {
- Try {
- entry.reporter.reconfigure(config)
- }.failed.foreach { error =>
- logger.error(s"Metric reporter [${entry.name}] failed to reconfigure.", error)
- }
- }(entry.executionContext)
- }
- spanReporters.foreach {
- case (_, entry) =>
- Future {
- Try {
- entry.reporter.reconfigure(config)
- }.failed.foreach { error =>
- logger.error(s"Span reporter [${entry.name}] failed to reconfigure.", error)
- }
- }(entry.executionContext)
- }
- }
- private def reStartMetricTicker(): Unit = {
- val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis
- val currentMetricTicker = metricReporterTickerSchedule.get()
- if(currentMetricTicker != null)
- currentMetricTicker.cancel(false)
- metricReporterTickerSchedule.set {
- val initialDelay =
- if(registryConfiguration.optimisticMetricTickAlignment) {
- val now = clock.instant()
- val nextTick = Clock.nextTick(now, registryConfiguration.metricTickInterval)
- Duration.between(now, nextTick).toMillis
- } else tickIntervalMillis
- registryExecutionContext.scheduleAtFixedRate(
- new MetricReporterTicker(metrics, metricReporters, clock), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS
- )
- }
- }
- private def reStartTraceTicker(): Unit = {
- val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis
- val currentSpanTicker = spanReporterTickerSchedule.get()
- if(currentSpanTicker != null)
- currentSpanTicker.cancel(false)
- spanReporterTickerSchedule.set {
- registryExecutionContext.scheduleAtFixedRate(
- new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
- )
- }
- }
- def reportSpan(span: Span.FinishedSpan): Unit = {
- spanReporters.foreach { case (_, reporterEntry) =>
- if(reporterEntry.isActive)
- reporterEntry.buffer.offer(span)
- }
- }
- private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
- entry.isActive = false
- Future {
- Try {
- entry.reporter.stop()
- }.failed.foreach { error =>
- logger.error(s"Metric reporter [${entry.name}] failed to stop.", error)
- }
- }(entry.executionContext).andThen {
- case _ => entry.executionContext.shutdown()
- }(ExecutionContext.fromExecutor(registryExecutionContext))
- }
- private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
- entry.isActive = false
- Future {
- Try {
- entry.reporter.stop()
- }.failed.foreach { error =>
- logger.error(s"Span reporter [${entry.name}] failed to stop.", error)
- }
- }(entry.executionContext).andThen {
- case _ => entry.executionContext.shutdown()
- }(ExecutionContext.fromExecutor(registryExecutionContext))
- }
- private class MetricReporterEntry(
- @volatile var isActive: Boolean = true,
- val id: Long,
- val name: String,
- val reporter: MetricReporter,
- val filter: Option[String],
- val executionContext: ExecutionContextExecutorService
- )
- private class SpanReporterEntry(
- @volatile var isActive: Boolean = true,
- val id: Long,
- val name: String,
- val reporter: SpanReporter,
- val bufferCapacity: Int,
- val executionContext: ExecutionContextExecutorService
- ) {
- val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
- }
- private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry],
- clock: Clock) extends Runnable {
- val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker])
- var lastInstant = Instant.now(clock)
- def run(): Unit = try {
- val currentInstant = Instant.now(clock)
- val periodSnapshot = PeriodSnapshot(
- from = lastInstant,
- to = currentInstant,
- metrics = snapshotGenerator.snapshot()
- )
- reporterEntries.foreach { case (_, entry) =>
- Future {
- Try {
- if (entry.isActive) {
- val filteredSnapshot = entry.filter
- .map(f => filterMetrics(f, periodSnapshot))
- .getOrElse(periodSnapshot)
- entry.reporter.reportPeriodSnapshot(filteredSnapshot)
- }
- }.failed.foreach { error =>
- logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
- }
- }(entry.executionContext)
- }
- lastInstant = currentInstant
- } catch {
- case NonFatal(t) => logger.error("Error while running a tick", t)
- }
- private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = {
- val metricFilter = Kamon.filter(filterName)
- val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name))
- val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name))
- val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name))
- val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name))
- periodSnapshot.copy(metrics = MetricsSnapshot(
- histograms, rangeSamplers, gauges, counters
- ))
- }
- }
- private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
- override def run(): Unit = {
- spanReporters.foreach {
- case (_, entry) =>
- val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity)
- entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
- Future {
- Try {
- entry.reporter.reportSpans(spanBatch.asScala)
- }.failed.foreach { error =>
- logger.error(s"Reporter [${entry.name}] failed to report spans.", error)
- }
- }(entry.executionContext)
- }
- }
- }
- private def readRegistryConfiguration(config: Config): Configuration =
- Configuration(
- metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
- optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
- traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
- traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
- configuredReporters = config.getStringList("kamon.reporters").asScala
- )
- private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean,
- traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String])
- }
diff --git a/kamon-core/src/main/scala/kamon/Tracing.scala b/kamon-core/src/main/scala/kamon/Tracing.scala
new file mode 100644
index 00000000..30506209
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Tracing.scala
@@ -0,0 +1,18 @@
+package kamon
+import kamon.trace.{IdentityProvider, Tracer}
+trait Tracing { self: Configuration with Utilities =>
+ private val _tracer = Tracer.Default(Kamon, config(), clock())
+ onReconfigure(newConfig => _tracer.reconfigure(newConfig))
+ def buildSpan(operationName: String): Tracer.SpanBuilder =
+ _tracer.buildSpan(operationName)
+ def identityProvider: IdentityProvider =
+ _tracer.identityProvider
+ protected def tracer(): Tracer.Default =
+ _tracer
diff --git a/kamon-core/src/main/scala/kamon/Utilities.scala b/kamon-core/src/main/scala/kamon/Utilities.scala
new file mode 100644
index 00000000..b5c7d2a6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Utilities.scala
@@ -0,0 +1,58 @@
+package kamon
+import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
+import com.typesafe.config.Config
+import kamon.util.{Clock, Filters, Matcher}
+ * Base utilities used by other Kamon components.
+ */
+trait Utilities { self: Configuration =>
+ private val _clock = new Clock.Default()
+ private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(self.config()), numberedThreadFactory("kamon-scheduler", daemon = false))
+ @volatile private var _filters = Filters.fromConfig(self.config())
+ self.onReconfigure(newConfig => {
+ self._filters = Filters.fromConfig(newConfig)
+ self._scheduler match {
+ case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
+ case _ => // cannot change the pool size on other unknown types.
+ }
+ })
+ sys.addShutdownHook(() => _scheduler.shutdown())
+ /**
+ * Applies a filter to the given pattern. All filters are configured on the kamon.util.filters configuration section.
+ *
+ * @return true if the pattern matches at least one includes pattern and none of the excludes patterns in the filter.
+ */
+ def filter(filterName: String, pattern: String): Boolean =
+ _filters.accept(filterName, pattern)
+ /**
+ * Retrieves a matcher for the given filter name. All filters are configured on the kamon.util.filters configuration
+ * section.
+ */
+ def filter(filterName: String): Matcher =
+ _filters.get(filterName)
+ /**
+ * Kamon's clock implementation.
+ */
+ def clock(): Clock =
+ _clock
+ /**
+ * Scheduler to be used for Kamon-related tasks like updating gauges.
+ */
+ def scheduler(): ScheduledExecutorService =
+ _scheduler
+ private def schedulerPoolSize(config: Config): Int =
+ config.getInt("kamon.scheduler-pool-size")
diff --git a/kamon-core/src/main/scala/kamon/module/Module.scala b/kamon-core/src/main/scala/kamon/module/Module.scala
new file mode 100644
index 00000000..41649629
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/module/Module.scala
@@ -0,0 +1,426 @@
+package kamon
+package module
+import java.time.{Duration, Instant}
+import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
+import com.typesafe.config.Config
+import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
+import kamon.trace.Tracer.SpanBuffer
+import kamon.util.Clock
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
+import scala.util.Try
+import scala.util.control.NonFatal
+ * Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace
+ * data to external services. Additionally, modules can be automatically registered in Kamon by simply being present
+ * in the classpath and having the appropriate entry in the configuration file. All modules get a dedicated execution
+ * context which will be used to call the start, stop and reconfigure hooks.
+ *
+ * Besides the basic lifecycle hooks, when registering a [[MetricReporter]] and/or [[SpanReporter]] module, Kamon will
+ * also schedule calls to [[MetricReporter.reportPeriodSnapshot()]] and [[SpanReporter.reportSpans()]] in the module's
+ * execution context.
+ */
+trait Module {
+ /**
+ * Signals that a the module has been registered in Kamon's module registry.
+ */
+ def start(): Unit
+ /**
+ * Signals that the module should be stopped and all acquired resources, if any, should be released.
+ */
+ def stop(): Unit
+ /**
+ * Signals that a new configuration object has been provided to Kamon. Modules should ensure that their internal
+ * settings are in sync with the provided configuration.
+ */
+ def reconfigure(newConfig: Config): Unit
+object Module {
+ /**
+ * Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its
+ * registration.
+ */
+ trait Registration {
+ /**
+ * Removes and stops the related module.
+ */
+ def cancel(): Unit
+ }
+ /**
+ * Controls the lifecycle of all available modules.
+ */
+ class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
+ private val _logger = LoggerFactory.getLogger(classOf[Registry])
+ private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
+ private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
+ private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+ private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+ private var _registrySettings = readRegistryConfiguration(configuration.config())
+ private var _registeredModules: Map[String, Entry[Module]] = Map.empty
+ private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
+ private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty
+ // Start ticking as soon as the registry is created.
+ scheduleMetricsTicker()
+ scheduleSpansTicker()
+ /**
+ * Registers a module that has already been instantiated by the user. The start callback will be executed as part
+ * of the registration process. If a module with the specified name already exists the registration will fail. If
+ * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
+ * and spans data upon every tick.
+ *
+ * @param name Desired module name.
+ * @param module Module instance.
+ * @return A registration that can be used to stop the module at any time.
+ */
+ def register(name: String, module: Module): Registration = synchronized {
+ if(_registeredModules.get(name).isEmpty) {
+ val moduleEntry = createEntry(name, true, module)
+ startModule(moduleEntry)
+ registration(moduleEntry)
+ } else {
+ _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
+ noopRegistration(name)
+ }
+ }
+ /**
+ * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
+ * configured modules state.
+ */
+ def load(config: Config): Unit = synchronized {
+ val configuredModules = readModuleSettings(config)
+ val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }
+ // Start, reconfigure and stop modules that are still present but disabled.
+ configuredModules.foreach { moduleSettings =>
+ automaticallyRegisteredModules.get(moduleSettings.name).fold {
+ // The module does not exist in the registry, the only possible action is starting it, if enabled.
+ if(moduleSettings.enabled) {
+ createModule(moduleSettings).foreach(entry => startModule(entry))
+ }
+ } { existentModuleSettings =>
+ // When a module already exists it can either need to be stopped, or to be reconfigured.
+ if(moduleSettings.enabled) {
+ reconfigureModule(existentModuleSettings, config)
+ } else {
+ stopModule(existentModuleSettings)
+ }
+ }
+ }
+ // Remove all modules that no longer exist in the configuration.
+ val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
+ missingModules.foreach {
+ case (_, entry) => stopModule(entry)
+ }
+ }
+ /**
+ * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
+ * registry.
+ */
+ def reconfigure(newConfig: Config): Unit = synchronized {
+ _registrySettings = readRegistryConfiguration(configuration.config())
+ _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
+ scheduleMetricsTicker()
+ scheduleSpansTicker()
+ }
+ /**
+ * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
+ * spans available until the call to stop.
+ */
+ def stop(): Future[Unit] = synchronized {
+ implicit val cleanupExecutor = ExecutionContext.Implicits.global
+ scheduleMetricsTicker(once = true)
+ scheduleSpansTicker(once = true)
+ val stopSignals = _registeredModules.values.map(stopModule)
+ val latch = new CountDownLatch(stopSignals.size)
+ stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))
+ // There is a global 30 seconds limit to shutdown after which all executors will shut down.
+ val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
+ stopCompletionFuture.onComplete(_ => {
+ _metricsTickerExecutor.shutdown()
+ _spansTickerExecutor.shutdown()
+ })
+ stopCompletionFuture.map(_ => ())
+ }
+ /**
+ * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
+ * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
+ * cancelled and scheduled again.
+ */
+ private def scheduleMetricsTicker(once: Boolean = false): Unit = {
+ val currentMetricsTicker = _metricsTickerSchedule.get()
+ if(currentMetricsTicker != null)
+ currentMetricsTicker.cancel(false)
+ _metricsTickerSchedule.set {
+ val interval = _registrySettings.metricTickInterval.toMillis
+ val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
+ val now = clock.instant()
+ val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
+ Duration.between(now, nextTick).toMillis
+ } else _registrySettings.metricTickInterval.toMillis
+ val ticker = new Runnable {
+ var lastInstant = Instant.now(clock)
+ override def run(): Unit = try {
+ val currentInstant = Instant.now(clock)
+ val periodSnapshot = PeriodSnapshot(
+ from = lastInstant,
+ to = currentInstant,
+ metrics = snapshotGenerator.snapshot())
+ metricReporterModules().foreach { entry =>
+ Future {
+ Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
+ _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
+ }
+ }(entry.executionContext)
+ }
+ lastInstant = currentInstant
+ } catch {
+ case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
+ }
+ }
+ if(once)
+ _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
+ else
+ _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
+ }
+ }
+ /**
+ * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
+ * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
+ * cancelled and scheduled again.
+ */
+ private def scheduleSpansTicker(once: Boolean = false): Unit = {
+ val currentSpansTicker = _spansTickerSchedule.get()
+ if(currentSpansTicker != null)
+ currentSpansTicker.cancel(false)
+ _spansTickerSchedule.set {
+ val interval = _registrySettings.traceTickInterval.toMillis
+ val ticker = new Runnable {
+ override def run(): Unit = try {
+ val spanBatch = spanBuffer.flush()
+ spanReporterModules().foreach { entry =>
+ Future {
+ Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
+ _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
+ }
+ }(entry.executionContext)
+ }
+ } catch {
+ case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
+ }
+ }
+ if(once)
+ _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
+ else
+ _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
+ }
+ }
+ private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
+ _metricReporterModules.values
+ }
+ private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
+ _spanReporterModules.values
+ }
+ private def readModuleSettings(config: Config): Seq[Settings] = {
+ val moduleConfigs = config.getConfig("kamon.modules").configurations
+ val moduleSettings = moduleConfigs.map {
+ case (moduleName, moduleConfig) =>
+ val moduleSettings = Try {
+ Settings(
+ moduleName,
+ moduleConfig.getString("class"),
+ moduleConfig.getBoolean("enabled")
+ )
+ }
+ moduleSettings.failed.foreach { t =>
+ _logger.warn(s"Failed to read configuration for module [$moduleName]", t)
+ if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
+ _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
+ }
+ }
+ moduleSettings
+ } filter(_.isSuccess) map(_.get) toSeq
+ // Legacy modules from <1.2.0
+ val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass =>
+ Settings(moduleClass, moduleClass, true)
+ } toSeq
+ val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty)
+ repeatedLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
+ uniqueLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
+ moduleSettings ++ uniqueLegacyModules
+ }
+ /**
+ * Creates a module from the provided settings.
+ */
+ private def createModule(settings: Settings): Option[Entry[Module]] = {
+ val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil)
+ val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance))
+ moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
+ moduleEntry.toOption
+ }
+ private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
+ val executor = Executors.newSingleThreadExecutor(threadFactory(name))
+ Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance)
+ }
+ /**
+ * Registers a module and schedules execution of its start procedure.
+ */
+ private def startModule(entry: Entry[Module]): Unit = {
+ registerModule(entry)
+ // Schedule the start hook on the module
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ Try(entry.module.start())
+ .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
+ })
+ }
+ private def registerModule(entry: Entry[Module]): Unit = {
+ _registeredModules = _registeredModules + (entry.name -> entry)
+ if(entry.module.isInstanceOf[MetricReporter])
+ _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
+ if(entry.module.isInstanceOf[SpanReporter])
+ _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])
+ }
+ /**
+ * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
+ * context. The returned future completes when the module finishes its stop procedure.
+ */
+ private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
+ val cleanupExecutor = ExecutionContext.Implicits.global
+ // Remove the module from all registries
+ _registeredModules = _registeredModules - entry.name
+ if(entry.module.isInstanceOf[MetricReporter])
+ _metricReporterModules = _metricReporterModules - entry.name
+ if(entry.module.isInstanceOf[SpanReporter])
+ _spanReporterModules = _spanReporterModules - entry.name
+ // Schedule a call to stop on the module
+ val stopPromise = Promise[Unit]()
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ stopPromise.complete {
+ val stopResult = Try(entry.module.stop())
+ stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
+ stopResult
+ }
+ })
+ stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
+ stopPromise.future
+ }
+ /**
+ * Schedules a call to reconfigure on the module's execution context.
+ */
+ private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ Try(entry.module.reconfigure(config))
+ .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
+ })
+ }
+ private def noopRegistration(moduleName: String): Registration = new Registration {
+ override def cancel(): Unit =
+ _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
+ }
+ private def registration(entry: Entry[Module]): Registration = new Registration {
+ override def cancel(): Unit = stopModule(entry)
+ }
+ }
+ private def readRegistryConfiguration(config: Config): RegistrySettings =
+ RegistrySettings(
+ metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
+ optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
+ traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
+ traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
+ )
+ private case class RegistrySettings(
+ metricTickInterval: Duration,
+ optimisticMetricTickAlignment: Boolean,
+ traceTickInterval: Duration,
+ traceReporterQueueSize: Int
+ )
+ private case class Settings(
+ name: String,
+ fqcn: String,
+ enabled: Boolean
+ )
+ private case class Entry[T <: Module](
+ name: String,
+ executionContext: ExecutionContextExecutorService,
+ programmaticallyAdded: Boolean,
+ module: T
+ )
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/module/ReportingModule.scala b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala
new file mode 100644
index 00000000..0e88fc23
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala
@@ -0,0 +1,21 @@
+package kamon
+package module
+import kamon.trace.Span
+import kamon.metric.PeriodSnapshot
+ * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The
+ * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting.
+ */
+trait MetricReporter extends Module {
+ def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
+ * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the
+ * span batches is controlled by the kamon.trace.tick-interval setting.
+ */
+trait SpanReporter extends Module {
+ def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 6015e350..ee301e8a 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -18,7 +18,6 @@ package trace
import java.time.Instant
-import kamon.ReporterRegistry.SpanSink
import kamon.context.Context
import kamon.metric.MeasurementUnit
import kamon.trace.SpanContext.SamplingDecision
@@ -90,7 +89,7 @@ object Span {
final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span {
+ initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span {
private var collectMetrics: Boolean = trackMetrics
private var open: Boolean = true
@@ -203,7 +202,7 @@ object Span {
- spanSink.reportSpan(toFinishedSpan(to))
+ spanBuffer.append(toFinishedSpan(to))
@@ -229,9 +228,9 @@ object Span {
object Local {
def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink,
+ initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer,
trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local =
- new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanSink, trackMetrics, scopeSpanMetrics, clock)
+ new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanBuffer, trackMetrics, scopeSpanMetrics, clock)
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
index 4cad1eb7..542638cf 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
@@ -38,7 +38,7 @@ object SpanPropagation {
import B3.Headers
override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val traceID = reader.read(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
@@ -122,7 +122,7 @@ object SpanPropagation {
override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
reader.read(Header.B3).map { header =>
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val (traceID, spanID, samplingDecision, parentSpanID) = header.splitToTuple("-")
@@ -220,7 +220,7 @@ object SpanPropagation {
if(medium.available() == 0)
else {
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val colferSpan = new ColferSpan()
colferSpan.unmarshal(medium.readAll(), 0)
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index ad7ffbed..7a314205 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -18,13 +18,13 @@ package kamon.trace
import java.time.Instant
import com.typesafe.config.Config
-import kamon.ReporterRegistry.SpanSink
import kamon.Kamon
import kamon.metric.MetricLookup
-import kamon.trace.Span.TagValue
+import kamon.trace.Span.{FinishedSpan, TagValue}
import kamon.trace.SpanContext.SamplingDecision
import kamon.trace.Tracer.SpanBuilder
import kamon.util.{Clock, DynamicAccess}
+import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue}
import org.slf4j.LoggerFactory
import scala.collection.immutable
@@ -37,19 +37,26 @@ trait Tracer {
object Tracer {
- final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer {
- private val logger = LoggerFactory.getLogger(classOf[Tracer])
+ private[kamon] trait SpanBuffer {
+ def append(span: FinishedSpan): Unit
+ def flush(): Seq[FinishedSpan]
+ }
+ final class Default(metrics: MetricLookup, initialConfig: Config, clock: Clock) extends Tracer with SpanBuffer {
+ private val _logger = LoggerFactory.getLogger(classOf[Tracer])
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
- @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true
- @volatile private[Tracer] var scopeSpanMetrics: Boolean = true
+ @volatile private[Tracer] var _traceReporterQueueSize = 1024
+ @volatile private[Tracer] var _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](_traceReporterQueueSize)
+ @volatile private[Tracer] var _joinRemoteParentsWithSameSpanID: Boolean = true
+ @volatile private[Tracer] var _scopeSpanMetrics: Boolean = true
@volatile private[Tracer] var _sampler: Sampler = Sampler.Never
@volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default()
override def buildSpan(operationName: String): SpanBuilder =
- new SpanBuilder(operationName, this, spanSink, clock)
+ new SpanBuilder(operationName, this, this, clock)
override def identityProvider: IdentityProvider =
@@ -69,29 +76,54 @@ object Tracer {
case other => sys.error(s"Unexpected sampler name $other.")
+ val newTraceReporterQueueSize = traceConfig.getInt("reporter-queue-size")
val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id")
val newScopeSpanMetrics = traceConfig.getBoolean("span-metrics.scope-spans-to-parent")
val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider](
traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)]
+ if(_traceReporterQueueSize != newTraceReporterQueueSize) {
+ // By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters.
+ // Since reconfigures are very unlikely to happen beyond application startup this might be a problem at all.
+ // If we eventually decide to keep those possible Spans around then we will need to change the queue type to
+ // multiple consumer as the reconfiguring thread will need to drain the contents before replacing.
+ _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](newTraceReporterQueueSize)
+ }
_sampler = newSampler
- joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
- scopeSpanMetrics = newScopeSpanMetrics
+ _joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
+ _scopeSpanMetrics = newScopeSpanMetrics
_identityProvider = newIdentityProvider
+ _traceReporterQueueSize = newTraceReporterQueueSize
}.failed.foreach {
- ex => logger.error("Unable to reconfigure Kamon Tracer", ex)
+ ex => _logger.error("Unable to reconfigure Kamon Tracer", ex)
+ override def append(span: FinishedSpan): Unit =
+ _spanBuffer.offer(span)
+ override def flush(): Seq[FinishedSpan] = {
+ var spans = Seq.empty[FinishedSpan]
+ _spanBuffer.drain(new MessagePassingQueue.Consumer[Span.FinishedSpan] {
+ override def accept(span: FinishedSpan): Unit =
+ spans = span +: spans
+ })
+ spans
+ }
object Default {
- def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default =
- new Default(metrics, spanSink, initialConfig, clock)
+ def apply(metrics: MetricLookup, initialConfig: Config, clock: Clock): Default =
+ new Default(metrics, initialConfig, clock)
- final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) {
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanBuffer: Tracer.SpanBuffer, clock: Clock) {
private var parentSpan: Span = _
private var initialOperationName: String = operationName
private var from: Instant = Instant.EPOCH
@@ -192,15 +224,15 @@ object Tracer {
- spanSink,
+ spanBuffer,
- tracer.scopeSpanMetrics,
+ tracer._scopeSpanMetrics,
private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
- if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID)
+ if(parent.isRemote() && tracer._joinRemoteParentsWithSameSpanID)
parent.context().copy(samplingDecision = samplingDecision)
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)