From c1553381e74b5ffaee1598fce8b3b5458d039b2b Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 11 Jun 2017 18:11:38 +0200 Subject: implement loading reporters from configuration --- kamon-core/src/main/resources/reference.conf | 9 +++-- .../src/main/scala/kamon/ReporterRegistry.scala | 40 ++++++++++++++++++---- 2 files changed, 41 insertions(+), 8 deletions(-) (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 80da12a5..51a89a9e 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -12,9 +12,10 @@ kamon { instance = "auto" } - # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`. + # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`. All reporter + # classes must # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`. - reporters = [] + reporters = [ ] metric { tick-interval = 60 seconds @@ -84,8 +85,12 @@ kamon { trace { + # Interval at which sampled finished spans will be flushed to SpanReporters. tick-interval = 10 seconds + # Size of the internal queue where sampled spans will stay until they get flushed. If the queue becomes full then + # sampled finished spans will be dropped in order to avoid consuming excessive amounts of memory. Each configured + # reporter has a separate queue. reporter-queue-size = 1024 diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 66193619..49c67288 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -23,6 +23,7 @@ import com.typesafe.config.Config import com.typesafe.scalalogging.Logger import kamon.metric._ import kamon.trace.Span +import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import scala.util.Try @@ -63,20 +64,45 @@ trait SpanReporter { } class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry { + private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) private val reporterCounter = new AtomicLong(0L) private var registryConfiguration = readRegistryConfiguration(initialConfig) private val metricReporters = TrieMap[Long, MetricReporterEntry]() private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val spanReporters = TrieMap[Long, SpanReporterEntry]() private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() reconfigure(initialConfig) - override def loadReportersFromConfig(): Unit = ??? + override def loadReportersFromConfig(): Unit = { + if(registryConfiguration.configuredReporters.isEmpty) + logger.info("The kamon.reporters setting is empty, no reporters have been started.") + else { + registryConfiguration.configuredReporters.foreach { reporterFQCN => + Try { + val reporterClass = Class.forName(reporterFQCN) + val instance = reporterClass.newInstance() + instance match { + case mr: MetricReporter => + addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded metric reporter [{}]", reporterFQCN) + + case sr: SpanReporter => + addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded span reporter [{}]", reporterFQCN) + + case anyOther => + logger.error("Cannot add [{}] as a reporter, it doesn't implement the MetricReporter or SpanReporter interfaces", anyOther) + } + }.failed.foreach { + t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) + } + } + } + } override def addReporter(reporter: MetricReporter): Registration = addMetricReporter(reporter, reporter.getClass.getName()) @@ -111,7 +137,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con val reporterEntry = new SpanReporterEntry( id = reporterCounter.incrementAndGet(), reporter = reporter, - bufferCapacity = 1024, + bufferCapacity = registryConfiguration.traceReporterQueueSize, executionContext = ExecutionContext.fromExecutorService(executor) ) @@ -157,7 +183,6 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty) reStartTraceTicker() - // Reconfigure all registered reporters metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } @@ -276,8 +301,11 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con private def readRegistryConfiguration(config: Config): Configuration = Configuration( metricTickInterval = config.getDuration("kamon.metric.tick-interval"), - traceTickInterval = config.getDuration("kamon.trace.tick-interval") + traceTickInterval = config.getDuration("kamon.trace.tick-interval"), + traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), + configuredReporters = config.getStringList("kamon.reporters").asScala ) - private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration) + private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, + traceReporterQueueSize: Int, configuredReporters: Seq[String]) } \ No newline at end of file -- cgit v1.2.3