aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-10-29 17:45:57 +0100
committerGitHub <noreply@github.com>2018-10-29 17:45:57 +0100
commitf1c6ceffa22c59a463d6d8cd2ca77e2b440eb450 (patch)
tree5b871456e3d5ec72ed19a85484a28975efafc4a4
parentd0a0dfe886952ba924adacaabf85cb96ce5fe032 (diff)
downloadKamon-f1c6ceffa22c59a463d6d8cd2ca77e2b440eb450.tar.gz
Kamon-f1c6ceffa22c59a463d6d8cd2ca77e2b440eb450.tar.bz2
Kamon-f1c6ceffa22c59a463d6d8cd2ca77e2b440eb450.zip
Implement a module registry that supports loading from configuration (#559)
-rw-r--r--build.sbt5
-rw-r--r--kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala6
-rw-r--r--kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala26
-rw-r--r--kamon-core/src/main/resources/reference.conf19
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala109
-rw-r--r--kamon-core/src/main/scala/kamon/Metrics.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/ModuleLoading.scala139
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala405
-rw-r--r--kamon-core/src/main/scala/kamon/Tracing.scala18
-rw-r--r--kamon-core/src/main/scala/kamon/Utilities.scala58
-rw-r--r--kamon-core/src/main/scala/kamon/module/Module.scala426
-rw-r--r--kamon-core/src/main/scala/kamon/module/ReportingModule.scala21
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala9
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala64
15 files changed, 790 insertions, 552 deletions
diff --git a/build.sbt b/build.sbt
index 131626d9..04e20f49 100644
--- a/build.sbt
+++ b/build.sbt
@@ -46,8 +46,9 @@ lazy val core = (project in file("kamon-core"))
.settings(
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.3.1",
- "org.slf4j" % "slf4j-api" % "1.7.25",
- "org.hdrhistogram" % "HdrHistogram" % "2.1.9"
+ "org.hdrhistogram" % "HdrHistogram" % "2.1.9",
+ "org.jctools" % "jctools-core" % "2.1.1",
+ "org.slf4j" % "slf4j-api" % "1.7.25"
)
)
diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
index b5d0425b..2a0af8c0 100644
--- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
@@ -10,17 +10,17 @@ import org.scalatest.{Matchers, WordSpec}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._
-class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{
+class KamonLifecycleSpec extends WordSpec with Matchers with Eventually {
"the Kamon lifecycle" should {
- "keep the JVM running if reporters are running" in {
+ "keep the JVM running if modules are running" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter"))
Thread.sleep(5000)
process.isAlive shouldBe true
process.destroyForcibly().waitFor(5, TimeUnit.SECONDS)
}
- "let the JVM stop after all reporters are stopped" in {
+ "let the JVM stop after all modules are stopped" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithTemporaryReporter"))
Thread.sleep(2000)
process.isAlive shouldBe true
diff --git a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala
index f325d1d1..b52303f6 100644
--- a/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/trace/TracerSpec.scala
@@ -29,7 +29,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
"the Kamon tracer" should {
"construct a minimal Span that only has a operation name" in {
- val span = tracer.buildSpan("myOperation").start()
+ val span = Kamon.buildSpan("myOperation").start()
val spanData = inspect(span)
spanData.operationName() shouldBe "myOperation"
@@ -38,7 +38,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
}
"pass the operation name and tags to started Span" in {
- val span = tracer.buildSpan("myOperation")
+ val span = Kamon.buildSpan("myOperation")
.withMetricTag("metric-tag", "value")
.withMetricTag("metric-tag", "value")
.withTag("hello", "world")
@@ -60,16 +60,16 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
}
"not have any parent Span if there is no Span in the current context and no parent was explicitly given" in {
- val span = tracer.buildSpan("myOperation").start()
+ val span = Kamon.buildSpan("myOperation").start()
val spanData = inspect(span)
spanData.context().parentID shouldBe IdentityProvider.NoIdentifier
}
"automatically take the Span from the current Context as parent" in {
- val parent = tracer.buildSpan("myOperation").start()
+ val parent = Kamon.buildSpan("myOperation").start()
val child = Kamon.withSpan(parent) {
- tracer.buildSpan("childOperation").asChildOf(parent).start()
+ Kamon.buildSpan("childOperation").asChildOf(parent).start()
}
val parentData = inspect(parent)
@@ -78,9 +78,9 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
}
"ignore the span from the current context as parent if explicitly requested" in {
- val parent = tracer.buildSpan("myOperation").start()
+ val parent = Kamon.buildSpan("myOperation").start()
val child = Kamon.withSpan(parent) {
- tracer.buildSpan("childOperation").ignoreParentFromContext().start()
+ Kamon.buildSpan("childOperation").ignoreParentFromContext().start()
}
val childData = inspect(child)
@@ -88,7 +88,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
}
"allow overriding the start timestamp for a Span" in {
- val span = tracer.buildSpan("myOperation").withFrom(Instant.EPOCH.plusMillis(321)).start()
+ val span = Kamon.buildSpan("myOperation").withFrom(Instant.EPOCH.plusMillis(321)).start()
val spanData = inspect(span)
spanData.from() shouldBe Instant.EPOCH.plusMillis(321)
}
@@ -101,7 +101,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
}
val remoteParent = Span.Remote(createSpanContext())
- val childData = inspect(tracer.buildSpan("local").asChildOf(remoteParent).start())
+ val childData = inspect(Kamon.buildSpan("local").asChildOf(remoteParent).start())
childData.context().traceID shouldBe remoteParent.context.traceID
childData.context().parentID shouldBe remoteParent.context.parentID
@@ -114,10 +114,10 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
val sampledRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.Sample))
val notSampledRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.DoNotSample))
- tracer.buildSpan("childOfSampled").asChildOf(sampledRemoteParent).start().context()
+ Kamon.buildSpan("childOfSampled").asChildOf(sampledRemoteParent).start().context()
.samplingDecision shouldBe(SamplingDecision.Sample)
- tracer.buildSpan("childOfNotSampled").asChildOf(notSampledRemoteParent).start().context()
+ Kamon.buildSpan("childOfNotSampled").asChildOf(notSampledRemoteParent).start().context()
.samplingDecision shouldBe(SamplingDecision.DoNotSample)
}
@@ -129,7 +129,7 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
}
val unknownSamplingRemoteParent = Span.Remote(createSpanContext().copy(samplingDecision = SamplingDecision.Unknown))
- tracer.buildSpan("childOfSampled").asChildOf(unknownSamplingRemoteParent).start().context()
+ Kamon.buildSpan("childOfSampled").asChildOf(unknownSamplingRemoteParent).start().context()
.samplingDecision shouldBe(SamplingDecision.Sample)
Kamon.reconfigure(previousConfig)
@@ -137,6 +137,4 @@ class TracerSpec extends WordSpec with Matchers with SpanBuilding with SpanInspe
}
- val tracer: Tracer = Kamon
-
}
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 2fa3d33b..0fb3ce0a 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -25,11 +25,22 @@ kamon {
}
}
- # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadReportersFromConfig()`. All
- # reporter classes must have a default constructor. No metric filtering is applied to metric reporters started this way.
+ # Modules that can be automatically discovered and started by Kamon. The configuration for each module has the
+ # following schema:
+ #
+ # kamon.modules {
+ # module-name {
+ # enabled = true
+ # class = "com.example.ModuleClass"
+ # }
+ # }
+ #
+ # All available modules in the classpath are started when calling Kamon.loadModules() and stopped when calling
+ # Kamon.stopModules().
+ #
+ modules {
- # Example: `reporters = ["kamon.prometheus.PrometheusReporter", "kamon.zipkin.ZipkinReporter"]`.
- reporters = [ ]
+ }
# Pool size for the executor service that will run sampling on RangeSampler instruments. This scheduler is accesible
# through Kamon.scheduler()
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index a6cc7bf4..ab95d773 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -15,113 +15,22 @@
package kamon
-import java.time.Duration
-import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
+object Kamon extends ClassLoading
+ with Configuration
+ with Utilities
+ with Metrics
+ with Tracing
+ with ModuleLoading
+ with ContextPropagation
+ with ContextStorage {
-import com.typesafe.config.{Config, ConfigFactory}
-import kamon.metric._
-import kamon.trace._
-import kamon.util.{Clock, Filters, Matcher, Registration}
-import org.slf4j.LoggerFactory
-import scala.concurrent.Future
-import scala.util.Try
-
-
-object Kamon extends MetricLookup with ClassLoading with Configuration with ReporterRegistry with Tracer with ContextPropagation with ContextStorage {
- private val logger = LoggerFactory.getLogger("kamon.Kamon")
-
- @volatile private var _config = ConfigFactory.load()
- @volatile private var _environment = Environment.fromConfig(_config)
- @volatile private var _filters = Filters.fromConfig(_config)
-
- private val _clock = new Clock.Default()
- private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler", daemon = true))
- private val _metrics = new MetricRegistry(_config, _scheduler)
- private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
- private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
- //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
-
- sys.addShutdownHook(() => _scheduler.shutdown())
+ @volatile private var _environment = Environment.fromConfig(config())
def environment: Environment =
_environment
onReconfigure(newConfig => {
- _config = config
_environment = Environment.fromConfig(config)
- _filters = Filters.fromConfig(config)
- _metrics.reconfigure(config)
- _reporterRegistry.reconfigure(config)
- _tracer.reconfigure(config)
-
- _scheduler match {
- case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
- case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other)
- }
})
-
- override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric =
- _metrics.histogram(name, unit, dynamicRange)
-
- override def counter(name: String, unit: MeasurementUnit): CounterMetric =
- _metrics.counter(name, unit)
-
- override def gauge(name: String, unit: MeasurementUnit): GaugeMetric =
- _metrics.gauge(name, unit)
-
- override def rangeSampler(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration],
- dynamicRange: Option[DynamicRange]): RangeSamplerMetric =
- _metrics.rangeSampler(name, unit, dynamicRange, sampleInterval)
-
- override def timer(name: String, dynamicRange: Option[DynamicRange]): TimerMetric =
- _metrics.timer(name, dynamicRange)
-
-
- def tracer: Tracer =
- _tracer
-
- override def buildSpan(operationName: String): Tracer.SpanBuilder =
- _tracer.buildSpan(operationName)
-
-
- override def identityProvider: IdentityProvider =
- _tracer.identityProvider
-
- override def loadReportersFromConfig(): Unit =
- _reporterRegistry.loadReportersFromConfig()
-
- override def addReporter(reporter: MetricReporter): Registration =
- _reporterRegistry.addReporter(reporter)
-
- override def addReporter(reporter: MetricReporter, name: String): Registration =
- _reporterRegistry.addReporter(reporter, name)
-
- override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
- _reporterRegistry.addReporter(reporter, name, filter)
-
- override def addReporter(reporter: SpanReporter): Registration =
- _reporterRegistry.addReporter(reporter)
-
- override def addReporter(reporter: SpanReporter, name: String): Registration =
- _reporterRegistry.addReporter(reporter, name)
-
- override def stopAllReporters(): Future[Unit] =
- _reporterRegistry.stopAllReporters()
-
- def filter(filterName: String, pattern: String): Boolean =
- _filters.accept(filterName, pattern)
-
- def filter(filterName: String): Matcher =
- _filters.get(filterName)
-
- def clock(): Clock =
- _clock
-
- def scheduler(): ScheduledExecutorService =
- _scheduler
-
- private def schedulerPoolSize(config: Config): Int =
- config.getInt("kamon.scheduler-pool-size")
-
}
diff --git a/kamon-core/src/main/scala/kamon/Metrics.scala b/kamon-core/src/main/scala/kamon/Metrics.scala
new file mode 100644
index 00000000..a2b74c20
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Metrics.scala
@@ -0,0 +1,31 @@
+package kamon
+
+import java.time.Duration
+
+import kamon.metric._
+
+
+trait Metrics extends MetricLookup { self: Configuration with Utilities =>
+ private val _metricsRegistry = new MetricRegistry(self.config(), self.scheduler())
+
+ override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric =
+ _metricsRegistry.histogram(name, unit, dynamicRange)
+
+ override def counter(name: String, unit: MeasurementUnit): CounterMetric =
+ _metricsRegistry.counter(name, unit)
+
+ override def gauge(name: String, unit: MeasurementUnit): GaugeMetric =
+ _metricsRegistry.gauge(name, unit)
+
+ override def rangeSampler(name: String, unit: MeasurementUnit, sampleInterval: Option[Duration],
+ dynamicRange: Option[DynamicRange]): RangeSamplerMetric =
+ _metricsRegistry.rangeSampler(name, unit, dynamicRange, sampleInterval)
+
+ override def timer(name: String, dynamicRange: Option[DynamicRange]): TimerMetric =
+ _metricsRegistry.timer(name, dynamicRange)
+
+
+ protected def metricRegistry(): MetricRegistry =
+ _metricsRegistry
+
+}
diff --git a/kamon-core/src/main/scala/kamon/ModuleLoading.scala b/kamon-core/src/main/scala/kamon/ModuleLoading.scala
new file mode 100644
index 00000000..8fe035d6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ModuleLoading.scala
@@ -0,0 +1,139 @@
+package kamon
+
+import com.typesafe.config.Config
+import kamon.metric.{MetricsSnapshot, PeriodSnapshot}
+import kamon.module.Module
+import kamon.util.Registration
+import kamon.module.{MetricReporter => NewMetricReporter}
+import kamon.module.{SpanReporter => NewSpanReporter}
+import kamon.module.Module.{Registration => NewRegistration}
+import kamon.trace.Span
+
+import scala.concurrent.Future
+
+@deprecated("Use kamon.module.Module instead", "1.2.0")
+sealed trait Reporter extends Module { }
+
+@deprecated("Use kamon.module.MetricReporter instead", "1.2.0")
+trait MetricReporter extends kamon.module.MetricReporter { }
+
+@deprecated("Use kamon.module.SpanReporter instead", "1.2.0")
+trait SpanReporter extends kamon.module.SpanReporter { }
+
+/**
+ * Handles the lifecycle of all modules known by Kamon. The most common implementations of modules are metrics and
+ * span reporters, but modules can be used to encapsulate any process that should be started automatically by Kamon and
+ * stopped when all modules are stopped (usually during shutdown).
+ *
+ * Modules can be automatically discovered from the kamon.modules configuration key, using the following schema:
+ *
+ * kamon.modules {
+ * module-name {
+ * enabled = true
+ * class = "com.example.MyModule"
+ * }
+ * }
+ *
+ */
+trait ModuleLoading { self: ClassLoading with Configuration with Utilities with Metrics with Tracing =>
+ private val _moduleRegistry = new Module.Registry(self, self, clock(), self.metricRegistry(), self.tracer())
+ self.onReconfigure(newConfig => self._moduleRegistry.reconfigure(newConfig))
+
+
+ /**
+ * Register a module instantiated by the user.
+ *
+ * @param name Module name. Registration will fail if a module with the given name already exists.
+ * @param module The module instance.
+ * @return A Registration that can be used to de-register the module at any time.
+ */
+ def registerModule(name: String, module: Module): NewRegistration =
+ _moduleRegistry.register(name, module)
+
+ /**
+ * Loads modules from Kamon's configuration.
+ */
+ def loadModules(): Unit =
+ _moduleRegistry.load(self.config())
+
+ /**
+ * Stops all registered modules. This includes automatically and programmatically registered modules.
+ *
+ * @return A future that completes when the stop callback on all available modules have been completed.
+ */
+ def stopModules(): Future[Unit] =
+ _moduleRegistry.stop()
+
+
+
+ // Compatibility with Kamon <1.2.0
+
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: MetricReporter): Registration =
+ wrapRegistration(_moduleRegistry.register(reporter.getClass.getName(), wrapLegacyMetricReporter(reporter)))
+
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: MetricReporter, name: String): Registration =
+ wrapRegistration(_moduleRegistry.register(name, wrapLegacyMetricReporter(reporter)))
+
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
+ wrapRegistration(_moduleRegistry.register(name, wrapLegacyMetricReporter(reporter, Some(filter))))
+
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: SpanReporter): Registration =
+ wrapRegistration(_moduleRegistry.register(reporter.getClass.getName(), wrapLegacySpanReporter(reporter)))
+
+ @deprecated("Use registerModule instead", "1.2.0")
+ def addReporter(reporter: SpanReporter, name: String): Registration =
+ wrapRegistration(_moduleRegistry.register(name, wrapLegacySpanReporter(reporter)))
+
+ @deprecated("Use stopModules instead", "1.2.0")
+ def stopAllReporters(): Future[Unit] =
+ _moduleRegistry.stop()
+
+ @deprecated("Use loadModules instead", "1.2.0")
+ def loadReportersFromConfig(): Unit =
+ loadModules()
+
+
+ private def wrapRegistration(registration: NewRegistration): Registration = new Registration {
+ override def cancel(): Boolean = {
+ registration.cancel()
+ true
+ }
+ }
+
+ private def wrapLegacyMetricReporter(reporter: MetricReporter, filter: Option[String] = None): NewMetricReporter = new NewMetricReporter {
+ override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
+ val filteredSnapshot = filter
+ .map(f => filterMetrics(f, snapshot))
+ .getOrElse(snapshot)
+ reporter.reportPeriodSnapshot(filteredSnapshot)
+ }
+
+ private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = {
+ val metricFilter = Kamon.filter(filterName)
+ val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name))
+ val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name))
+ val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name))
+ val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name))
+
+ periodSnapshot.copy(metrics = MetricsSnapshot(
+ histograms, rangeSamplers, gauges, counters
+ ))
+ }
+
+ override def start(): Unit = reporter.start()
+ override def stop(): Unit = reporter.stop()
+ override def reconfigure(config: Config): Unit = reporter.reconfigure(config)
+ }
+
+ private def wrapLegacySpanReporter(reporter: SpanReporter): NewSpanReporter = new NewSpanReporter {
+ override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = reporter.reportSpans(spans)
+ override def start(): Unit = reporter.start()
+ override def stop(): Unit = reporter.stop()
+ override def reconfigure(newConfig: Config): Unit = reporter.reconfigure(newConfig)
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
deleted file mode 100644
index 06b9761a..00000000
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ /dev/null
@@ -1,405 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon
-
-import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
-import java.util.concurrent._
-
-import com.typesafe.config.Config
-import kamon.metric._
-import kamon.trace.Span
-import kamon.trace.Span.FinishedSpan
-import kamon.util.{Clock, DynamicAccess, Registration}
-import org.slf4j.LoggerFactory
-
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
-import scala.util.Try
-import scala.util.control.NonFatal
-import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.concurrent.duration._
-
-sealed trait Reporter {
- def start(): Unit
- def stop(): Unit
- def reconfigure(config: Config): Unit
-}
-
-trait MetricReporter extends Reporter {
- def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
-}
-
-trait SpanReporter extends Reporter {
- def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
-}
-
-trait ReporterRegistry {
- def loadReportersFromConfig(): Unit
-
- def addReporter(reporter: MetricReporter): Registration
- def addReporter(reporter: MetricReporter, name: String): Registration
- def addReporter(reporter: MetricReporter, name: String, filter: String): Registration
- def addReporter(reporter: SpanReporter): Registration
- def addReporter(reporter: SpanReporter, name: String): Registration
-
- def stopAllReporters(): Future[Unit]
-}
-
-object ReporterRegistry {
-
- private[kamon] trait SpanSink {
- def reportSpan(finishedSpan: FinishedSpan): Unit
- }
-
- private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config, clock: Clock) extends ReporterRegistry with SpanSink {
- private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
- private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry", daemon = true))
- private val reporterCounter = new AtomicLong(0L)
- private var registryConfiguration = readRegistryConfiguration(initialConfig)
-
- private val metricReporters = TrieMap[Long, MetricReporterEntry]()
- private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
- private val spanReporters = TrieMap[Long, SpanReporterEntry]()
- private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
-
-
- reconfigure(initialConfig)
-
- override def loadReportersFromConfig(): Unit = {
- if(registryConfiguration.configuredReporters.isEmpty)
- logger.info("The kamon.reporters setting is empty, no reporters have been started.")
- else {
- registryConfiguration.configuredReporters.foreach { reporterFQCN =>
- val dynamicAccess = new DynamicAccess(getClass.getClassLoader)
- dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({
- case mr: MetricReporter =>
- addMetricReporter(mr, "loaded-from-config: " + reporterFQCN)
- logger.info("Loaded metric reporter [{}]", reporterFQCN)
-
- case sr: SpanReporter =>
- addSpanReporter(sr, "loaded-from-config: " + reporterFQCN)
- logger.info("Loaded span reporter [{}]", reporterFQCN)
-
- }).failed.foreach {
- t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t)
- }
- }
- }
- }
-
- override def addReporter(reporter: MetricReporter): Registration =
- addMetricReporter(reporter, reporter.getClass.getName())
-
- override def addReporter(reporter: MetricReporter, name: String): Registration =
- addMetricReporter(reporter, name)
-
- override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
- addMetricReporter(reporter, name, Some(filter))
-
- override def addReporter(reporter: SpanReporter): Registration =
- addSpanReporter(reporter, reporter.getClass.getName())
-
- override def addReporter(reporter: SpanReporter, name: String): Registration =
- addSpanReporter(reporter, name)
-
-
- private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = new MetricReporterEntry(
- id = reporterCounter.getAndIncrement(),
- name = name,
- reporter = reporter,
- filter = filter,
- executionContext = ExecutionContext.fromExecutorService(executor)
- )
-
- Future {
- Try {
- reporterEntry.reporter.start()
- }.failed.foreach { error =>
- logger.error(s"Metric reporter [$name] failed to start.", error)
- }
- }(reporterEntry.executionContext)
-
- if(metricReporters.isEmpty)
- reStartMetricTicker()
-
- metricReporters.put(reporterEntry.id, reporterEntry)
- createRegistration(reporterEntry.id, metricReporters)
-
- }
-
- private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = new SpanReporterEntry(
- id = reporterCounter.incrementAndGet(),
- name = name,
- reporter = reporter,
- bufferCapacity = registryConfiguration.traceReporterQueueSize,
- executionContext = ExecutionContext.fromExecutorService(executor)
- )
-
- Future {
- Try {
- reporterEntry.reporter.start()
- }.failed.foreach { error =>
- logger.error(s"Span reporter [$name] failed to start.", error)
- }
- }(reporterEntry.executionContext)
-
- if(spanReporters.isEmpty)
- reStartTraceTicker()
-
- spanReporters.put(reporterEntry.id, reporterEntry)
- createRegistration(reporterEntry.id, spanReporters)
- }
-
- private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration {
- override def cancel(): Boolean =
- target.remove(id).nonEmpty
- }
-
- override def stopAllReporters(): Future[Unit] = {
- implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
- val reporterStopFutures = Vector.newBuilder[Future[Unit]]
-
- while(metricReporters.nonEmpty) {
- val (idToRemove, _) = metricReporters.head
- metricReporters.remove(idToRemove).foreach { entry =>
- reporterStopFutures += stopMetricReporter(entry)
- }
- }
-
- while(spanReporters.nonEmpty) {
- val (idToRemove, _) = spanReporters.head
- spanReporters.remove(idToRemove).foreach { entry =>
- reporterStopFutures += stopSpanReporter(entry)
- }
- }
-
- Future.sequence(reporterStopFutures.result()).map(_ => ())
- }
-
- private[kamon] def reconfigure(config: Config): Unit = synchronized {
- val oldConfig = registryConfiguration
- registryConfiguration = readRegistryConfiguration(config)
-
- if(oldConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty)
- reStartMetricTicker()
-
- if(oldConfig.traceTickInterval != registryConfiguration.traceTickInterval && spanReporters.nonEmpty)
- reStartTraceTicker()
-
- // Reconfigure all registered reporters
- metricReporters.foreach {
- case (_, entry) =>
- Future {
- Try {
- entry.reporter.reconfigure(config)
- }.failed.foreach { error =>
- logger.error(s"Metric reporter [${entry.name}] failed to reconfigure.", error)
- }
- }(entry.executionContext)
- }
- spanReporters.foreach {
- case (_, entry) =>
- Future {
- Try {
- entry.reporter.reconfigure(config)
- }.failed.foreach { error =>
- logger.error(s"Span reporter [${entry.name}] failed to reconfigure.", error)
- }
- }(entry.executionContext)
- }
- }
-
-
- private def reStartMetricTicker(): Unit = {
- val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis
- val currentMetricTicker = metricReporterTickerSchedule.get()
-
- if(currentMetricTicker != null)
- currentMetricTicker.cancel(false)
-
- metricReporterTickerSchedule.set {
- val initialDelay =
- if(registryConfiguration.optimisticMetricTickAlignment) {
- val now = clock.instant()
- val nextTick = Clock.nextTick(now, registryConfiguration.metricTickInterval)
- Duration.between(now, nextTick).toMillis
-
- } else tickIntervalMillis
-
- registryExecutionContext.scheduleAtFixedRate(
- new MetricReporterTicker(metrics, metricReporters, clock), initialDelay, tickIntervalMillis, TimeUnit.MILLISECONDS
- )
- }
- }
-
- private def reStartTraceTicker(): Unit = {
- val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis
- val currentSpanTicker = spanReporterTickerSchedule.get()
- if(currentSpanTicker != null)
- currentSpanTicker.cancel(false)
-
- spanReporterTickerSchedule.set {
- registryExecutionContext.scheduleAtFixedRate(
- new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
- )
- }
- }
-
- def reportSpan(span: Span.FinishedSpan): Unit = {
- spanReporters.foreach { case (_, reporterEntry) =>
- if(reporterEntry.isActive)
- reporterEntry.buffer.offer(span)
- }
- }
-
- private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
- entry.isActive = false
-
- Future {
- Try {
- entry.reporter.stop()
- }.failed.foreach { error =>
- logger.error(s"Metric reporter [${entry.name}] failed to stop.", error)
- }
- }(entry.executionContext).andThen {
- case _ => entry.executionContext.shutdown()
- }(ExecutionContext.fromExecutor(registryExecutionContext))
- }
-
- private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
- entry.isActive = false
-
- Future {
- Try {
- entry.reporter.stop()
- }.failed.foreach { error =>
- logger.error(s"Span reporter [${entry.name}] failed to stop.", error)
- }
- }(entry.executionContext).andThen {
- case _ => entry.executionContext.shutdown()
- }(ExecutionContext.fromExecutor(registryExecutionContext))
- }
-
- private class MetricReporterEntry(
- @volatile var isActive: Boolean = true,
- val id: Long,
- val name: String,
- val reporter: MetricReporter,
- val filter: Option[String],
- val executionContext: ExecutionContextExecutorService
- )
-
- private class SpanReporterEntry(
- @volatile var isActive: Boolean = true,
- val id: Long,
- val name: String,
- val reporter: SpanReporter,
- val bufferCapacity: Int,
- val executionContext: ExecutionContextExecutorService
- ) {
- val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
- }
-
- private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry],
- clock: Clock) extends Runnable {
-
- val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker])
- var lastInstant = Instant.now(clock)
-
- def run(): Unit = try {
- val currentInstant = Instant.now(clock)
- val periodSnapshot = PeriodSnapshot(
- from = lastInstant,
- to = currentInstant,
- metrics = snapshotGenerator.snapshot()
- )
-
- reporterEntries.foreach { case (_, entry) =>
- Future {
- Try {
- if (entry.isActive) {
- val filteredSnapshot = entry.filter
- .map(f => filterMetrics(f, periodSnapshot))
- .getOrElse(periodSnapshot)
-
- entry.reporter.reportPeriodSnapshot(filteredSnapshot)
- }
-
- }.failed.foreach { error =>
- logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
- }
-
- }(entry.executionContext)
- }
-
- lastInstant = currentInstant
-
- } catch {
- case NonFatal(t) => logger.error("Error while running a tick", t)
- }
-
- private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = {
- val metricFilter = Kamon.filter(filterName)
- val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name))
- val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name))
- val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name))
- val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name))
-
- periodSnapshot.copy(metrics = MetricsSnapshot(
- histograms, rangeSamplers, gauges, counters
- ))
- }
- }
-
-
- private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
- override def run(): Unit = {
- spanReporters.foreach {
- case (_, entry) =>
-
- val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity)
- entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
-
- Future {
- Try {
- entry.reporter.reportSpans(spanBatch.asScala)
- }.failed.foreach { error =>
- logger.error(s"Reporter [${entry.name}] failed to report spans.", error)
- }
- }(entry.executionContext)
- }
- }
- }
-
- private def readRegistryConfiguration(config: Config): Configuration =
- Configuration(
- metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
- optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
- traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
- traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
- configuredReporters = config.getStringList("kamon.reporters").asScala
- )
-
- private case class Configuration(metricTickInterval: Duration, optimisticMetricTickAlignment: Boolean,
- traceTickInterval: Duration, traceReporterQueueSize: Int, configuredReporters: Seq[String])
- }
-}
-
diff --git a/kamon-core/src/main/scala/kamon/Tracing.scala b/kamon-core/src/main/scala/kamon/Tracing.scala
new file mode 100644
index 00000000..30506209
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Tracing.scala
@@ -0,0 +1,18 @@
+package kamon
+
+import kamon.trace.{IdentityProvider, Tracer}
+
+trait Tracing { self: Configuration with Utilities =>
+ private val _tracer = Tracer.Default(Kamon, config(), clock())
+ onReconfigure(newConfig => _tracer.reconfigure(newConfig))
+
+ def buildSpan(operationName: String): Tracer.SpanBuilder =
+ _tracer.buildSpan(operationName)
+
+ def identityProvider: IdentityProvider =
+ _tracer.identityProvider
+
+ protected def tracer(): Tracer.Default =
+ _tracer
+
+}
diff --git a/kamon-core/src/main/scala/kamon/Utilities.scala b/kamon-core/src/main/scala/kamon/Utilities.scala
new file mode 100644
index 00000000..b5c7d2a6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Utilities.scala
@@ -0,0 +1,58 @@
+package kamon
+
+import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
+
+import com.typesafe.config.Config
+import kamon.util.{Clock, Filters, Matcher}
+
+/**
+ * Base utilities used by other Kamon components.
+ */
+trait Utilities { self: Configuration =>
+ private val _clock = new Clock.Default()
+ private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(self.config()), numberedThreadFactory("kamon-scheduler", daemon = false))
+ @volatile private var _filters = Filters.fromConfig(self.config())
+
+ self.onReconfigure(newConfig => {
+ self._filters = Filters.fromConfig(newConfig)
+ self._scheduler match {
+ case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
+ case _ => // cannot change the pool size on other unknown types.
+ }
+ })
+
+ sys.addShutdownHook(() => _scheduler.shutdown())
+
+
+ /**
+ * Applies a filter to the given pattern. All filters are configured on the kamon.util.filters configuration section.
+ *
+ * @return true if the pattern matches at least one includes pattern and none of the excludes patterns in the filter.
+ */
+ def filter(filterName: String, pattern: String): Boolean =
+ _filters.accept(filterName, pattern)
+
+ /**
+ * Retrieves a matcher for the given filter name. All filters are configured on the kamon.util.filters configuration
+ * section.
+ */
+ def filter(filterName: String): Matcher =
+ _filters.get(filterName)
+
+ /**
+ * Kamon's clock implementation.
+ */
+ def clock(): Clock =
+ _clock
+
+ /**
+ * Scheduler to be used for Kamon-related tasks like updating gauges.
+ */
+ def scheduler(): ScheduledExecutorService =
+ _scheduler
+
+
+
+ private def schedulerPoolSize(config: Config): Int =
+ config.getInt("kamon.scheduler-pool-size")
+}
diff --git a/kamon-core/src/main/scala/kamon/module/Module.scala b/kamon-core/src/main/scala/kamon/module/Module.scala
new file mode 100644
index 00000000..41649629
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/module/Module.scala
@@ -0,0 +1,426 @@
+package kamon
+package module
+
+import java.time.{Duration, Instant}
+import java.util.concurrent.{CountDownLatch, Executors, ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicReference
+
+import com.typesafe.config.Config
+import kamon.metric.{MetricsSnapshotGenerator, PeriodSnapshot}
+import kamon.trace.Tracer.SpanBuffer
+import kamon.util.Clock
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
+import scala.util.Try
+import scala.util.control.NonFatal
+
+/**
+ * Modules provide additional capabilities to Kamon, like collecting JVM metrics or exporting the metrics and trace
+ * data to external services. Additionally, modules can be automatically registered in Kamon by simply being present
+ * in the classpath and having the appropriate entry in the configuration file. All modules get a dedicated execution
+ * context which will be used to call the start, stop and reconfigure hooks.
+ *
+ * Besides the basic lifecycle hooks, when registering a [[MetricReporter]] and/or [[SpanReporter]] module, Kamon will
+ * also schedule calls to [[MetricReporter.reportPeriodSnapshot()]] and [[SpanReporter.reportSpans()]] in the module's
+ * execution context.
+ */
+trait Module {
+
+ /**
+ * Signals that a the module has been registered in Kamon's module registry.
+ */
+ def start(): Unit
+
+ /**
+ * Signals that the module should be stopped and all acquired resources, if any, should be released.
+ */
+ def stop(): Unit
+
+ /**
+ * Signals that a new configuration object has been provided to Kamon. Modules should ensure that their internal
+ * settings are in sync with the provided configuration.
+ */
+ def reconfigure(newConfig: Config): Unit
+}
+
+
+object Module {
+
+ /**
+ * Represents a module's registration on the module registry. A module can be stopped at any time by cancelling its
+ * registration.
+ */
+ trait Registration {
+
+ /**
+ * Removes and stops the related module.
+ */
+ def cancel(): Unit
+ }
+
+ /**
+ * Controls the lifecycle of all available modules.
+ */
+ class Registry(classLoading: ClassLoading, configuration: Configuration, clock: Clock, snapshotGenerator: MetricsSnapshotGenerator, spanBuffer: SpanBuffer) {
+ private val _logger = LoggerFactory.getLogger(classOf[Registry])
+ private val _metricsTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-metrics-ticker", daemon = true))
+ private val _spansTickerExecutor = Executors.newScheduledThreadPool(1, threadFactory("kamon-spans-ticker", daemon = true))
+
+ private val _metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+ private val _spansTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+
+ private var _registrySettings = readRegistryConfiguration(configuration.config())
+ private var _registeredModules: Map[String, Entry[Module]] = Map.empty
+ private var _metricReporterModules: Map[String, Entry[MetricReporter]] = Map.empty
+ private var _spanReporterModules: Map[String, Entry[SpanReporter]] = Map.empty
+
+ // Start ticking as soon as the registry is created.
+ scheduleMetricsTicker()
+ scheduleSpansTicker()
+
+
+ /**
+ * Registers a module that has already been instantiated by the user. The start callback will be executed as part
+ * of the registration process. If a module with the specified name already exists the registration will fail. If
+ * the registered module is a MetricReporter and/or SpanReporter it will also be configured to receive the metrics
+ * and spans data upon every tick.
+ *
+ * @param name Desired module name.
+ * @param module Module instance.
+ * @return A registration that can be used to stop the module at any time.
+ */
+ def register(name: String, module: Module): Registration = synchronized {
+ if(_registeredModules.get(name).isEmpty) {
+ val moduleEntry = createEntry(name, true, module)
+ startModule(moduleEntry)
+ registration(moduleEntry)
+
+ } else {
+ _logger.warn(s"Cannot register module [$name], a module with that name already exists.")
+ noopRegistration(name)
+ }
+ }
+
+ /**
+ * Reads all available modules from the config and either starts, stops or reconfigures them in order to match the
+ * configured modules state.
+ */
+ def load(config: Config): Unit = synchronized {
+ val configuredModules = readModuleSettings(config)
+ val automaticallyRegisteredModules = _registeredModules.filterNot { case (_, module) => module.programmaticallyAdded }
+
+ // Start, reconfigure and stop modules that are still present but disabled.
+ configuredModules.foreach { moduleSettings =>
+ automaticallyRegisteredModules.get(moduleSettings.name).fold {
+ // The module does not exist in the registry, the only possible action is starting it, if enabled.
+ if(moduleSettings.enabled) {
+ createModule(moduleSettings).foreach(entry => startModule(entry))
+ }
+
+ } { existentModuleSettings =>
+ // When a module already exists it can either need to be stopped, or to be reconfigured.
+ if(moduleSettings.enabled) {
+ reconfigureModule(existentModuleSettings, config)
+ } else {
+ stopModule(existentModuleSettings)
+ }
+ }
+ }
+
+ // Remove all modules that no longer exist in the configuration.
+ val missingModules = automaticallyRegisteredModules.filterKeys(moduleName => configuredModules.find(_.name == moduleName).isEmpty)
+ missingModules.foreach {
+ case (_, entry) => stopModule(entry)
+ }
+ }
+
+ /**
+ * Schedules the reconfigure hook on all registered modules and applies the latest configuration settings to the
+ * registry.
+ */
+ def reconfigure(newConfig: Config): Unit = synchronized {
+ _registrySettings = readRegistryConfiguration(configuration.config())
+ _registeredModules.values.foreach(entry => reconfigureModule(entry, newConfig))
+ scheduleMetricsTicker()
+ scheduleSpansTicker()
+ }
+
+ /**
+ * Stops all registered modules. As part of the stop process, all modules get a last chance to report metrics and
+ * spans available until the call to stop.
+ */
+ def stop(): Future[Unit] = synchronized {
+ implicit val cleanupExecutor = ExecutionContext.Implicits.global
+ scheduleMetricsTicker(once = true)
+ scheduleSpansTicker(once = true)
+
+ val stopSignals = _registeredModules.values.map(stopModule)
+ val latch = new CountDownLatch(stopSignals.size)
+ stopSignals.foreach(f => f.onComplete(_ => latch.countDown()))
+
+ // There is a global 30 seconds limit to shutdown after which all executors will shut down.
+ val stopCompletionFuture = Future(latch.await(30, TimeUnit.SECONDS))
+ stopCompletionFuture.onComplete(_ => {
+ _metricsTickerExecutor.shutdown()
+ _spansTickerExecutor.shutdown()
+ })
+
+ stopCompletionFuture.map(_ => ())
+ }
+
+
+ /**
+ * (Re)Schedules the metrics ticker that periodically takes snapshots from the metric registry and sends them to
+ * all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
+ * cancelled and scheduled again.
+ */
+ private def scheduleMetricsTicker(once: Boolean = false): Unit = {
+ val currentMetricsTicker = _metricsTickerSchedule.get()
+ if(currentMetricsTicker != null)
+ currentMetricsTicker.cancel(false)
+
+ _metricsTickerSchedule.set {
+ val interval = _registrySettings.metricTickInterval.toMillis
+ val initialDelay = if(_registrySettings.optimisticMetricTickAlignment) {
+ val now = clock.instant()
+ val nextTick = Clock.nextTick(now, _registrySettings.metricTickInterval)
+ Duration.between(now, nextTick).toMillis
+ } else _registrySettings.metricTickInterval.toMillis
+
+ val ticker = new Runnable {
+ var lastInstant = Instant.now(clock)
+
+ override def run(): Unit = try {
+ val currentInstant = Instant.now(clock)
+ val periodSnapshot = PeriodSnapshot(
+ from = lastInstant,
+ to = currentInstant,
+ metrics = snapshotGenerator.snapshot())
+
+ metricReporterModules().foreach { entry =>
+ Future {
+ Try(entry.module.reportPeriodSnapshot(periodSnapshot)).failed.foreach { error =>
+ _logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
+ }
+ }(entry.executionContext)
+ }
+
+ lastInstant = currentInstant
+ } catch {
+ case NonFatal(t) => _logger.error("Failed to run a metrics tick", t)
+ }
+ }
+
+ if(once)
+ _metricsTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
+ else
+ _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ /**
+ * (Re)Schedules the spans ticker that periodically takes the spans accumulated by the tracer and flushes them to
+ * all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
+ * cancelled and scheduled again.
+ */
+ private def scheduleSpansTicker(once: Boolean = false): Unit = {
+ val currentSpansTicker = _spansTickerSchedule.get()
+ if(currentSpansTicker != null)
+ currentSpansTicker.cancel(false)
+
+ _spansTickerSchedule.set {
+ val interval = _registrySettings.traceTickInterval.toMillis
+
+ val ticker = new Runnable {
+ override def run(): Unit = try {
+ val spanBatch = spanBuffer.flush()
+
+ spanReporterModules().foreach { entry =>
+ Future {
+ Try(entry.module.reportSpans(spanBatch)).failed.foreach { error =>
+ _logger.error(s"Reporter [${entry.name}] failed to process a spans tick.", error)
+ }
+ }(entry.executionContext)
+ }
+
+ } catch {
+ case NonFatal(t) => _logger.error("Failed to run a spans tick", t)
+ }
+ }
+
+ if(once)
+ _spansTickerExecutor.schedule(ticker, 0L, TimeUnit.MILLISECONDS)
+ else
+ _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit.MILLISECONDS)
+ }
+ }
+
+ private def metricReporterModules(): Iterable[Entry[MetricReporter]] = synchronized {
+ _metricReporterModules.values
+ }
+
+ private def spanReporterModules(): Iterable[Entry[SpanReporter]] = synchronized {
+ _spanReporterModules.values
+ }
+
+ private def readModuleSettings(config: Config): Seq[Settings] = {
+ val moduleConfigs = config.getConfig("kamon.modules").configurations
+ val moduleSettings = moduleConfigs.map {
+ case (moduleName, moduleConfig) =>
+ val moduleSettings = Try {
+ Settings(
+ moduleName,
+ moduleConfig.getString("class"),
+ moduleConfig.getBoolean("enabled")
+ )
+ }
+
+ moduleSettings.failed.foreach { t =>
+ _logger.warn(s"Failed to read configuration for module [$moduleName]", t)
+
+ if(moduleConfig.hasPath("requires-aspectj") || moduleConfig.hasPath("auto-start") || moduleConfig.hasPath("extension-class")) {
+ _logger.warn(s"Module [$moduleName] contains legacy configuration settings, please ensure that no legacy configuration")
+ }
+ }
+
+ moduleSettings
+
+ } filter(_.isSuccess) map(_.get) toSeq
+
+ // Legacy modules from <1.2.0
+ val legacyModules = config.getStringList("kamon.reporters").asScala map { moduleClass =>
+ Settings(moduleClass, moduleClass, true)
+ } toSeq
+
+ val (repeatedLegacyModules, uniqueLegacyModules) = legacyModules.partition(lm => moduleSettings.find(_.fqcn == lm.fqcn).nonEmpty)
+ repeatedLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured twice, please remove it from the deprecated kamon.reporters setting."))
+
+ uniqueLegacyModules.foreach(m =>
+ _logger.warn(s"Module [${m.name}] is configured in the deprecated kamon.reporters setting, please consider moving it to kamon.modules."))
+
+ moduleSettings ++ uniqueLegacyModules
+ }
+
+ /**
+ * Creates a module from the provided settings.
+ */
+ private def createModule(settings: Settings): Option[Entry[Module]] = {
+ val moduleInstance = classLoading.createInstance[Module](settings.fqcn, Nil)
+ val moduleEntry = moduleInstance.map(instance => createEntry(settings.name, false, instance))
+
+ moduleEntry.failed.foreach(t => _logger.warn(s"Failed to create instance of module [${settings.name}]", t))
+ moduleEntry.toOption
+ }
+
+ private def createEntry(name: String, programmaticallyAdded: Boolean, instance: Module): Entry[Module] = {
+ val executor = Executors.newSingleThreadExecutor(threadFactory(name))
+ Entry(name, ExecutionContext.fromExecutorService(executor), programmaticallyAdded, instance)
+ }
+
+
+ /**
+ * Registers a module and schedules execution of its start procedure.
+ */
+ private def startModule(entry: Entry[Module]): Unit = {
+ registerModule(entry)
+
+ // Schedule the start hook on the module
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ Try(entry.module.start())
+ .failed.foreach(t => _logger.warn(s"Failure occurred while starting module [${entry.name}]", t))
+ })
+ }
+
+ private def registerModule(entry: Entry[Module]): Unit = {
+ _registeredModules = _registeredModules + (entry.name -> entry)
+ if(entry.module.isInstanceOf[MetricReporter])
+ _metricReporterModules = _metricReporterModules + (entry.name -> entry.asInstanceOf[Entry[MetricReporter]])
+ if(entry.module.isInstanceOf[SpanReporter])
+ _spanReporterModules = _spanReporterModules + (entry.name -> entry.asInstanceOf[Entry[SpanReporter]])
+
+ }
+
+ /**
+ * Removes the module from the registry and schedules a call to the stop lifecycle hook on the module's execution
+ * context. The returned future completes when the module finishes its stop procedure.
+ */
+ private def stopModule(entry: Entry[Module]): Future[Unit] = synchronized {
+ val cleanupExecutor = ExecutionContext.Implicits.global
+
+ // Remove the module from all registries
+ _registeredModules = _registeredModules - entry.name
+ if(entry.module.isInstanceOf[MetricReporter])
+ _metricReporterModules = _metricReporterModules - entry.name
+ if(entry.module.isInstanceOf[SpanReporter])
+ _spanReporterModules = _spanReporterModules - entry.name
+
+
+ // Schedule a call to stop on the module
+ val stopPromise = Promise[Unit]()
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ stopPromise.complete {
+ val stopResult = Try(entry.module.stop())
+ stopResult.failed.foreach(t => _logger.warn(s"Failure occurred while stopping module [${entry.name}]", t))
+ stopResult
+ }
+
+ })
+
+ stopPromise.future.onComplete(_ => entry.executionContext.shutdown())(cleanupExecutor)
+ stopPromise.future
+ }
+
+ /**
+ * Schedules a call to reconfigure on the module's execution context.
+ */
+ private def reconfigureModule(entry: Entry[Module], config: Config): Unit = {
+ entry.executionContext.execute(new Runnable {
+ override def run(): Unit =
+ Try(entry.module.reconfigure(config))
+ .failed.foreach(t => _logger.warn(s"Failure occurred while reconfiguring module [${entry.name}]", t))
+ })
+ }
+
+ private def noopRegistration(moduleName: String): Registration = new Registration {
+ override def cancel(): Unit =
+ _logger.warn(s"Cannot cancel registration on module [$moduleName] because the module was not added properly")
+ }
+
+ private def registration(entry: Entry[Module]): Registration = new Registration {
+ override def cancel(): Unit = stopModule(entry)
+ }
+ }
+
+ private def readRegistryConfiguration(config: Config): RegistrySettings =
+ RegistrySettings(
+ metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
+ optimisticMetricTickAlignment = config.getBoolean("kamon.metric.optimistic-tick-alignment"),
+ traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
+ traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size")
+ )
+
+ private case class RegistrySettings(
+ metricTickInterval: Duration,
+ optimisticMetricTickAlignment: Boolean,
+ traceTickInterval: Duration,
+ traceReporterQueueSize: Int
+ )
+
+ private case class Settings(
+ name: String,
+ fqcn: String,
+ enabled: Boolean
+ )
+
+ private case class Entry[T <: Module](
+ name: String,
+ executionContext: ExecutionContextExecutorService,
+ programmaticallyAdded: Boolean,
+ module: T
+ )
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/module/ReportingModule.scala b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala
new file mode 100644
index 00000000..0e88fc23
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/module/ReportingModule.scala
@@ -0,0 +1,21 @@
+package kamon
+package module
+
+import kamon.trace.Span
+import kamon.metric.PeriodSnapshot
+
+/**
+ * Modules implementing this trait will get registered for periodically receiving metric period snapshots. The
+ * frequency of the period snapshots is controlled by the kamon.metric.tick-interval setting.
+ */
+trait MetricReporter extends Module {
+ def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit
+}
+
+/**
+ * Modules implementing this trait will get registered for periodically receiving span batches. The frequency of the
+ * span batches is controlled by the kamon.trace.tick-interval setting.
+ */
+trait SpanReporter extends Module {
+ def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 6015e350..ee301e8a 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -18,7 +18,6 @@ package trace
import java.time.Instant
-import kamon.ReporterRegistry.SpanSink
import kamon.context.Context
import kamon.metric.MeasurementUnit
import kamon.trace.SpanContext.SamplingDecision
@@ -90,7 +89,7 @@ object Span {
final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span {
+ initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span {
private var collectMetrics: Boolean = trackMetrics
private var open: Boolean = true
@@ -203,7 +202,7 @@ object Span {
recordSpanMetrics(to)
if(sampled)
- spanSink.reportSpan(toFinishedSpan(to))
+ spanBuffer.append(toFinishedSpan(to))
}
}
@@ -229,9 +228,9 @@ object Span {
object Local {
def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink,
+ initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer,
trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local =
- new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanSink, trackMetrics, scopeSpanMetrics, clock)
+ new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanBuffer, trackMetrics, scopeSpanMetrics, clock)
}
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
index 4cad1eb7..542638cf 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
@@ -38,7 +38,7 @@ object SpanPropagation {
import B3.Headers
override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val traceID = reader.read(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
@@ -122,7 +122,7 @@ object SpanPropagation {
override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
reader.read(Header.B3).map { header =>
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val (traceID, spanID, samplingDecision, parentSpanID) = header.splitToTuple("-")
@@ -220,7 +220,7 @@ object SpanPropagation {
if(medium.available() == 0)
context
else {
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val colferSpan = new ColferSpan()
colferSpan.unmarshal(medium.readAll(), 0)
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index ad7ffbed..7a314205 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -18,13 +18,13 @@ package kamon.trace
import java.time.Instant
import com.typesafe.config.Config
-import kamon.ReporterRegistry.SpanSink
import kamon.Kamon
import kamon.metric.MetricLookup
-import kamon.trace.Span.TagValue
+import kamon.trace.Span.{FinishedSpan, TagValue}
import kamon.trace.SpanContext.SamplingDecision
import kamon.trace.Tracer.SpanBuilder
import kamon.util.{Clock, DynamicAccess}
+import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue}
import org.slf4j.LoggerFactory
import scala.collection.immutable
@@ -37,19 +37,26 @@ trait Tracer {
object Tracer {
- final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer {
- private val logger = LoggerFactory.getLogger(classOf[Tracer])
+ private[kamon] trait SpanBuffer {
+ def append(span: FinishedSpan): Unit
+ def flush(): Seq[FinishedSpan]
+ }
+
+ final class Default(metrics: MetricLookup, initialConfig: Config, clock: Clock) extends Tracer with SpanBuffer {
+ private val _logger = LoggerFactory.getLogger(classOf[Tracer])
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
- @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true
- @volatile private[Tracer] var scopeSpanMetrics: Boolean = true
+ @volatile private[Tracer] var _traceReporterQueueSize = 1024
+ @volatile private[Tracer] var _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](_traceReporterQueueSize)
+ @volatile private[Tracer] var _joinRemoteParentsWithSameSpanID: Boolean = true
+ @volatile private[Tracer] var _scopeSpanMetrics: Boolean = true
@volatile private[Tracer] var _sampler: Sampler = Sampler.Never
@volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default()
reconfigure(initialConfig)
override def buildSpan(operationName: String): SpanBuilder =
- new SpanBuilder(operationName, this, spanSink, clock)
+ new SpanBuilder(operationName, this, this, clock)
override def identityProvider: IdentityProvider =
this._identityProvider
@@ -69,29 +76,54 @@ object Tracer {
case other => sys.error(s"Unexpected sampler name $other.")
}
+ val newTraceReporterQueueSize = traceConfig.getInt("reporter-queue-size")
val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id")
val newScopeSpanMetrics = traceConfig.getBoolean("span-metrics.scope-spans-to-parent")
val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider](
traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)]
).get
+ if(_traceReporterQueueSize != newTraceReporterQueueSize) {
+ // By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters.
+ // Since reconfigures are very unlikely to happen beyond application startup this might be a problem at all.
+ // If we eventually decide to keep those possible Spans around then we will need to change the queue type to
+ // multiple consumer as the reconfiguring thread will need to drain the contents before replacing.
+ _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](newTraceReporterQueueSize)
+ }
+
_sampler = newSampler
- joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
- scopeSpanMetrics = newScopeSpanMetrics
+ _joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
+ _scopeSpanMetrics = newScopeSpanMetrics
_identityProvider = newIdentityProvider
+ _traceReporterQueueSize = newTraceReporterQueueSize
}.failed.foreach {
- ex => logger.error("Unable to reconfigure Kamon Tracer", ex)
+ ex => _logger.error("Unable to reconfigure Kamon Tracer", ex)
}
}
+
+
+ override def append(span: FinishedSpan): Unit =
+ _spanBuffer.offer(span)
+
+ override def flush(): Seq[FinishedSpan] = {
+ var spans = Seq.empty[FinishedSpan]
+ _spanBuffer.drain(new MessagePassingQueue.Consumer[Span.FinishedSpan] {
+ override def accept(span: FinishedSpan): Unit =
+ spans = span +: spans
+ })
+
+ spans
+ }
+
}
object Default {
- def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default =
- new Default(metrics, spanSink, initialConfig, clock)
+ def apply(metrics: MetricLookup, initialConfig: Config, clock: Clock): Default =
+ new Default(metrics, initialConfig, clock)
}
- final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) {
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanBuffer: Tracer.SpanBuffer, clock: Clock) {
private var parentSpan: Span = _
private var initialOperationName: String = operationName
private var from: Instant = Instant.EPOCH
@@ -192,15 +224,15 @@ object Tracer {
initialSpanTags,
initialMetricTags,
spanFrom,
- spanSink,
+ spanBuffer,
trackMetrics,
- tracer.scopeSpanMetrics,
+ tracer._scopeSpanMetrics,
clock
)
}
private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
- if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID)
+ if(parent.isRemote() && tracer._joinRemoteParentsWithSameSpanID)
parent.context().copy(samplingDecision = samplingDecision)
else
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)