aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core/src/main/resources/reference.conf9
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala40
2 files changed, 41 insertions, 8 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 80da12a5..51a89a9e 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -12,9 +12,10 @@ kamon {
instance = "auto"
}
- # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`.
+ # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`. All reporter
+ # classes must
# Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`.
- reporters = []
+ reporters = [ ]
metric {
tick-interval = 60 seconds
@@ -84,8 +85,12 @@ kamon {
trace {
+ # Interval at which sampled finished spans will be flushed to SpanReporters.
tick-interval = 10 seconds
+ # Size of the internal queue where sampled spans will stay until they get flushed. If the queue becomes full then
+ # sampled finished spans will be dropped in order to avoid consuming excessive amounts of memory. Each configured
+ # reporter has a separate queue.
reporter-queue-size = 1024
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 66193619..49c67288 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -23,6 +23,7 @@ import com.typesafe.config.Config
import com.typesafe.scalalogging.Logger
import kamon.metric._
import kamon.trace.Span
+import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
import scala.util.Try
@@ -63,20 +64,45 @@ trait SpanReporter {
}
class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
+ private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
private val reporterCounter = new AtomicLong(0L)
private var registryConfiguration = readRegistryConfiguration(initialConfig)
private val metricReporters = TrieMap[Long, MetricReporterEntry]()
private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
-
private val spanReporters = TrieMap[Long, SpanReporterEntry]()
private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
reconfigure(initialConfig)
- override def loadReportersFromConfig(): Unit = ???
+ override def loadReportersFromConfig(): Unit = {
+ if(registryConfiguration.configuredReporters.isEmpty)
+ logger.info("The kamon.reporters setting is empty, no reporters have been started.")
+ else {
+ registryConfiguration.configuredReporters.foreach { reporterFQCN =>
+ Try {
+ val reporterClass = Class.forName(reporterFQCN)
+ val instance = reporterClass.newInstance()
+ instance match {
+ case mr: MetricReporter =>
+ addMetricReporter(mr, "loaded-from-config: " + reporterFQCN)
+ logger.info("Loaded metric reporter [{}]", reporterFQCN)
+
+ case sr: SpanReporter =>
+ addSpanReporter(sr, "loaded-from-config: " + reporterFQCN)
+ logger.info("Loaded span reporter [{}]", reporterFQCN)
+
+ case anyOther =>
+ logger.error("Cannot add [{}] as a reporter, it doesn't implement the MetricReporter or SpanReporter interfaces", anyOther)
+ }
+ }.failed.foreach {
+ t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t)
+ }
+ }
+ }
+ }
override def addReporter(reporter: MetricReporter): Registration =
addMetricReporter(reporter, reporter.getClass.getName())
@@ -111,7 +137,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
val reporterEntry = new SpanReporterEntry(
id = reporterCounter.incrementAndGet(),
reporter = reporter,
- bufferCapacity = 1024,
+ bufferCapacity = registryConfiguration.traceReporterQueueSize,
executionContext = ExecutionContext.fromExecutorService(executor)
)
@@ -157,7 +183,6 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty)
reStartTraceTicker()
-
// Reconfigure all registered reporters
metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
@@ -276,8 +301,11 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
private def readRegistryConfiguration(config: Config): Configuration =
Configuration(
metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
- traceTickInterval = config.getDuration("kamon.trace.tick-interval")
+ traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
+ traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
+ configuredReporters = config.getStringList("kamon.reporters").asScala
)
- private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration)
+ private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration,
+ traceReporterQueueSize: Int, configuredReporters: Seq[String])
} \ No newline at end of file