From 105ed9cb264eb3569b5ae0d65ac2fd8cb636f8e8 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 21 May 2017 14:05:05 +0200 Subject: wip, trying to get something that could be tested --- kamon-core/src/main/resources/reference.conf | 21 +++++- kamon-core/src/main/scala/kamon/Diagnostic.scala | 13 ---- kamon-core/src/main/scala/kamon/Environment.scala | 13 ---- kamon-core/src/main/scala/kamon/Kamon.scala | 80 ++++++++-------------- .../src/main/scala/kamon/ReporterRegistry.scala | 15 ---- kamon-core/src/main/scala/kamon/Util.scala | 15 ---- .../src/main/scala/kamon/metric/EntityFilter.scala | 10 +-- .../main/scala/kamon/metric/EntityRecorder.scala | 35 ++++++++-- .../main/scala/kamon/metric/RecorderRegistry.scala | 11 ++- kamon-core/src/main/scala/kamon/package.scala | 30 ++++++++ .../src/main/scala/kamon/util/Scheduler.scala | 5 -- 11 files changed, 122 insertions(+), 126 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/Diagnostic.scala delete mode 100644 kamon-core/src/main/scala/kamon/Environment.scala delete mode 100644 kamon-core/src/main/scala/kamon/Util.scala delete mode 100644 kamon-core/src/main/scala/kamon/util/Scheduler.scala (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index aac718ca..b514b1b7 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -1,11 +1,28 @@ kamon { metric { - tick-interval = 1 second + tick-interval = 60 seconds filters { - accept-unmatched = true + + # Determines whether entities from a category that doesn't have any filtering configuration should be tracked or + # not. E.g. If there are no filter sections for the "jdbc-datasource" category and `accept-unmatched-categories` + # is set to true, all entities for that category will be accepted, otherwise all will be rejected. + # + # NOTE: Using entity filters is a commodity for modules that might potentially track thousands of unnecessary + # entities, but not all modules are required to use filters, check the your module's documentation to + # determine whether setting up filters make sense or not. + accept-unmatched-categories = true + } + # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`. + # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`. + reporters = [] + + # Thread pool size used by the metrics refresh scheduler. This pool is only used to periodically sampling + # min-max-counter values. + refresh-scheduler-pool-size = 2 + instrument-factory { # Default instrument settings for histograms and min max counters. The actual settings to be used when creating diff --git a/kamon-core/src/main/scala/kamon/Diagnostic.scala b/kamon-core/src/main/scala/kamon/Diagnostic.scala deleted file mode 100644 index 87784a72..00000000 --- a/kamon-core/src/main/scala/kamon/Diagnostic.scala +++ /dev/null @@ -1,13 +0,0 @@ -package kamon - -// The types are just an idea, they will need further refinement. -trait Diagnostic { - def isAspectJWorking: Boolean - def detectedModules: Seq[String] - def entityFilterPatterns: Seq[String] - def metricSubscribers: Seq[String] - def traceSubscribers: Seq[String] - - // Category Name => Count - def entityCount: Map[String, Long] -} diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala deleted file mode 100644 index 5f728cbd..00000000 --- a/kamon-core/src/main/scala/kamon/Environment.scala +++ /dev/null @@ -1,13 +0,0 @@ -package kamon - -import java.util.concurrent.ScheduledExecutorService - -import com.typesafe.config.Config - -trait Environment { - def instance: String - def host: String - def application: String - def config: Config - def scheduler: ScheduledExecutorService -} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 317920f4..b318d59d 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -1,68 +1,48 @@ package kamon +import java.util.concurrent.atomic.AtomicReference + import com.typesafe.config.{Config, ConfigFactory} import kamon.metric.{RecorderRegistry, RecorderRegistryImpl} import kamon.trace.Tracer -/** - * The main entry point to all Kamon functionality. - * - * - * - * - */ -object Kamon { - private val recorderRegistry = new RecorderRegistryImpl(ConfigFactory.load()) - private val reporterRegistry = new ReporterRegistryImpl(recorderRegistry, ConfigFactory.load()) - private val kamonTracer = new Tracer(recorderRegistry, reporterRegistry) - - def tracer: io.opentracing.Tracer = kamonTracer - def metrics: RecorderRegistry = recorderRegistry - def reporters: ReporterRegistry = reporterRegistry - - def reconfigure(config: Config): Unit = synchronized { - recorderRegistry.reconfigure(config) - reporterRegistry.reconfigure(config) - } - - def environment: Environment = ??? - def diagnose: Diagnostic = ??? - def util: Util = ??? -} - - -/* - -Kamon.metrics.getRecorder("app-metrics") -Kamon.metrics.getRecorder("akka-actor", "test") +object Kamon { + private val _initialConfig = ConfigFactory.load() + private val _recorderRegistry = new RecorderRegistryImpl(_initialConfig) + private val _reporterRegistry = new ReporterRegistryImpl(_recorderRegistry, _initialConfig) + private val _tracer = new Tracer(_recorderRegistry, _reporterRegistry) + private val _environment = new AtomicReference[Environment](environmentFromConfig(ConfigFactory.load())) -Kamon.entities.get("akka-actor", "test") -Kamon.entities.remove(entity) + def tracer: io.opentracing.Tracer = + _tracer -Kamon.util.entityFilters.accept(entity) -Kamon.util.clock. + def metrics: RecorderRegistry = + _recorderRegistry -Kamon.entities.new(). + def reporters: ReporterRegistry = + _reporterRegistry -Kamon.subscriptions.loadFromConfig() -Kamon.subscriptions.subscribe(StatsD, Filters.IncludeAll) -Kamon.subscriptions.subscribe(NewRelic, Filters.Empty().includeCategory("span").withTag("span.kind", "server")) + def environment: Environment = + _environment.get() + def reconfigure(config: Config): Unit = synchronized { + _recorderRegistry.reconfigure(config) + _reporterRegistry.reconfigure(config) + _environment.set(environmentFromConfig(config)) + } -Things that you need to do with Kamon: -Global: - - Reconfigure - - Get Diagnostic Data -Metrics: - - create entities - - subscribe to metrics data -Tracer: - - Build Spans / Use ActiveSpanSource - - subscribe to tracing data - */ + case class Environment(config: Config, application: String, host: String, instance: String) + private def environmentFromConfig(config: Config): Environment = { + val environmentConfig = config.getConfig("kamon.environment") + val application = environmentConfig.getString("application") + val host = environmentConfig.getString("host") + val instance = environmentConfig.getString("instance") + Environment(config, application, host, instance) + } +} diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index b42c5abe..98bde946 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -134,21 +134,6 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) }(ExecutionContext.fromExecutor(registryExecutionContext)) } - /** - * Creates a thread factory that assigns the specified name to all created Threads. - */ - private def threadFactory(name: String): ThreadFactory = - new ThreadFactory { - val defaultFactory = Executors.defaultThreadFactory() - - override def newThread(r: Runnable): Thread = { - val thread = defaultFactory.newThread(r) - thread.setName(name) - thread - } - } - - private case class ReporterEntry( @volatile var isActive: Boolean = true, id: Long, diff --git a/kamon-core/src/main/scala/kamon/Util.scala b/kamon-core/src/main/scala/kamon/Util.scala deleted file mode 100644 index c8efbdc0..00000000 --- a/kamon-core/src/main/scala/kamon/Util.scala +++ /dev/null @@ -1,15 +0,0 @@ -package kamon - -import kamon.metric.EntityFilter - -/** - * Useful classes for Kamon and submodules. - * - */ -trait Util { - - /** - * @return Currently configured entity filters. - */ - def entityFilter: EntityFilter -} diff --git a/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala b/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala index cf203609..77fbfc4b 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala @@ -7,13 +7,13 @@ import com.typesafe.config.Config object EntityFilter { def fromConfig(config: Config): EntityFilter = { val filtersConfig = config.getConfig("kamon.metric.filters") - val acceptUnmatched = filtersConfig.getBoolean("accept-unmatched") + val acceptUnmatched = filtersConfig.getBoolean("accept-unmatched-categories") - val perCategoryFilters = filtersConfig.firstLevelKeys.filter(_ != "accept-unmatched") map { category: String ⇒ + val perCategoryFilters = filtersConfig.firstLevelKeys.filter(_ != "accept-unmatched-categories") map { category: String ⇒ val includes = readFilters(filtersConfig, s"$category.includes") val excludes = readFilters(filtersConfig, s"$category.excludes") - (category, new IncludeExcludeNameFilter(includes, excludes, acceptUnmatched)) + (category, new IncludeExcludeNameFilter(includes, excludes)) } toMap new EntityFilter(perCategoryFilters, acceptUnmatched) @@ -49,9 +49,9 @@ trait NameFilter { def accept(name: String): Boolean } -class IncludeExcludeNameFilter(includes: Seq[NameFilter], excludes: Seq[NameFilter], acceptUnmatched: Boolean) extends NameFilter { +class IncludeExcludeNameFilter(includes: Seq[NameFilter], excludes: Seq[NameFilter]) extends NameFilter { override def accept(name: String): Boolean = - (includes.exists(_.accept(name)) || acceptUnmatched) && !excludes.exists(_.accept(name)) + includes.exists(_.accept(name)) && !excludes.exists(_.accept(name)) } class RegexNameFilter(path: String) extends NameFilter { diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala index 8ce37082..ccdb463e 100644 --- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala +++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala @@ -1,11 +1,13 @@ package kamon.metric import java.time.Duration +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} import kamon.metric.instrument._ import kamon.util.MeasurementUnit import scala.collection.concurrent.TrieMap +import scala.util.Try trait EntityRecorder { def histogram(name: String): Histogram @@ -25,12 +27,11 @@ trait EntitySnapshotProducer { def snapshot(): EntitySnapshot } +class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory, scheduler: ScheduledExecutorService) + extends EntityRecorder with EntitySnapshotProducer { - - -class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory) extends EntityRecorder with EntitySnapshotProducer { private val histograms = TrieMap.empty[String, Histogram with DistributionSnapshotInstrument] - private val minMaxCounters = TrieMap.empty[String, MinMaxCounter with DistributionSnapshotInstrument] + private val minMaxCounters = TrieMap.empty[String, MinMaxCounterEntry] private val counters = TrieMap.empty[String, Counter with SingleValueSnapshotInstrument] private val gauges = TrieMap.empty[String, Gauge with SingleValueSnapshotInstrument] @@ -41,10 +42,14 @@ class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory histograms.atomicGetOrElseUpdate(name, instrumentFactory.buildHistogram(entity, name, dynamicRange, measurementUnit)) def minMaxCounter(name: String): MinMaxCounter = - minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name)) + minMaxCounters.atomicGetOrElseUpdate(name, + createMMCounterEntry(instrumentFactory.buildMinMaxCounter(entity, name)) + ).mmCounter def minMaxCounter(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, sampleFrequency: Duration): MinMaxCounter = - minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit)) + minMaxCounters.atomicGetOrElseUpdate(name, + createMMCounterEntry(instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit)) + ).mmCounter def gauge(name: String): Gauge = gauges.atomicGetOrElseUpdate(name, instrumentFactory.buildGauge(entity, name)) @@ -62,8 +67,24 @@ class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory new EntitySnapshot( entity, histograms = histograms.values.map(_.snapshot()).toSeq, - minMaxCounters = minMaxCounters.values.map(_.snapshot()).toSeq, + minMaxCounters = minMaxCounters.values.map(_.mmCounter.snapshot()).toSeq, gauges = gauges.values.map(_.snapshot()).toSeq, counters = counters.values.map(_.snapshot()).toSeq ) + + def cleanup(): Unit = { + minMaxCounters.values.foreach { mmCounter => + Try(mmCounter.refreshFuture.cancel(true)) + } + } + + private case class MinMaxCounterEntry(mmCounter: MinMaxCounter with DistributionSnapshotInstrument, refreshFuture: ScheduledFuture[_]) + + private def createMMCounterEntry(mmCounter: MinMaxCounter with DistributionSnapshotInstrument): MinMaxCounterEntry = { + val refreshFuture = scheduler.schedule(new Runnable { + override def run(): Unit = mmCounter.sample() + }, mmCounter.sampleInterval.toMillis, TimeUnit.MILLISECONDS) + + MinMaxCounterEntry(mmCounter, refreshFuture) + } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala index 53081760..fd728b1d 100644 --- a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala +++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala @@ -1,6 +1,7 @@ package kamon package metric +import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.Config @@ -16,6 +17,7 @@ trait RecorderRegistry { } class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { + private val scheduler = new ScheduledThreadPoolExecutor(1, numberedThreadFactory("kamon.metric.refresh-scheduler")) private val instrumentFactory = new AtomicReference[InstrumentFactory]() private val entityFilter = new AtomicReference[EntityFilter]() private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer] @@ -27,7 +29,7 @@ class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { entityFilter.get().accept(entity) override def getRecorder(entity: Entity): EntityRecorder = - entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get())) + entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get(), scheduler)) override def removeRecorder(entity: Entity): Boolean = entities.remove(entity).nonEmpty @@ -35,13 +37,20 @@ class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry { private[kamon] def reconfigure(config: Config): Unit = synchronized { instrumentFactory.set(InstrumentFactory.fromConfig(config)) entityFilter.set(EntityFilter.fromConfig(config)) + + val refreshSchedulerPoolSize = config.getInt("kamon.metric.refresh-scheduler-pool-size") + scheduler.setCorePoolSize(refreshSchedulerPoolSize) } private[kamon] def snapshot(): Seq[EntitySnapshot] = { entities.values.map(_.snapshot()).toSeq } + + //private[kamon] def diagnosticData } +case class RecorderRegistryDiagnostic(entities: Seq[Entity]) + diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala index 92c48017..b65022bc 100644 --- a/kamon-core/src/main/scala/kamon/package.scala +++ b/kamon-core/src/main/scala/kamon/package.scala @@ -1,3 +1,6 @@ +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{Executors, ThreadFactory} + import com.typesafe.config.Config import scala.collection.concurrent.TrieMap @@ -6,6 +9,33 @@ package object kamon { + /** + * Creates a thread factory that assigns the specified name to all created Threads. + */ + def threadFactory(name: String): ThreadFactory = + new ThreadFactory { + val defaultFactory = Executors.defaultThreadFactory() + + override def newThread(r: Runnable): Thread = { + val thread = defaultFactory.newThread(r) + thread.setName(name) + thread + } + } + + def numberedThreadFactory(name: String): ThreadFactory = + new ThreadFactory { + val count = new AtomicLong() + val defaultFactory = Executors.defaultThreadFactory() + + override def newThread(r: Runnable): Thread = { + val thread = defaultFactory.newThread(r) + thread.setName(name + "-" + count.incrementAndGet().toString) + thread + } + } + + /** * Workaround to the non thread-safe [[scala.collection.concurrent.TrieMap#getOrElseUpdate()]] method. More details on * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]]. diff --git a/kamon-core/src/main/scala/kamon/util/Scheduler.scala b/kamon-core/src/main/scala/kamon/util/Scheduler.scala deleted file mode 100644 index 0bc86f9a..00000000 --- a/kamon-core/src/main/scala/kamon/util/Scheduler.scala +++ /dev/null @@ -1,5 +0,0 @@ -package kamon.util - -trait Scheduler { - -} -- cgit v1.2.3