aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-05-21 14:05:05 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-05-21 14:05:05 +0200
commit105ed9cb264eb3569b5ae0d65ac2fd8cb636f8e8 (patch)
treec09b6644a14032b3c636cc7dbc6e225ca3256e85 /kamon-core/src
parente1e7853255131f26702229735e37e160c38f2d08 (diff)
downloadKamon-105ed9cb264eb3569b5ae0d65ac2fd8cb636f8e8.tar.gz
Kamon-105ed9cb264eb3569b5ae0d65ac2fd8cb636f8e8.tar.bz2
Kamon-105ed9cb264eb3569b5ae0d65ac2fd8cb636f8e8.zip
wip, trying to get something that could be tested
Diffstat (limited to 'kamon-core/src')
-rw-r--r--kamon-core/src/main/resources/reference.conf21
-rw-r--r--kamon-core/src/main/scala/kamon/Diagnostic.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/Environment.scala13
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala80
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/Util.scala15
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityFilter.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala35
-rw-r--r--kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala11
-rw-r--r--kamon-core/src/main/scala/kamon/package.scala30
-rw-r--r--kamon-core/src/main/scala/kamon/util/Scheduler.scala5
-rw-r--r--kamon-core/src/test/scala/kamon/metric/EntityFilterSpec.scala8
12 files changed, 126 insertions, 130 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index aac718ca..b514b1b7 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -1,11 +1,28 @@
kamon {
metric {
- tick-interval = 1 second
+ tick-interval = 60 seconds
filters {
- accept-unmatched = true
+
+ # Determines whether entities from a category that doesn't have any filtering configuration should be tracked or
+ # not. E.g. If there are no filter sections for the "jdbc-datasource" category and `accept-unmatched-categories`
+ # is set to true, all entities for that category will be accepted, otherwise all will be rejected.
+ #
+ # NOTE: Using entity filters is a commodity for modules that might potentially track thousands of unnecessary
+ # entities, but not all modules are required to use filters, check the your module's documentation to
+ # determine whether setting up filters make sense or not.
+ accept-unmatched-categories = true
+
}
+ # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`.
+ # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`.
+ reporters = []
+
+ # Thread pool size used by the metrics refresh scheduler. This pool is only used to periodically sampling
+ # min-max-counter values.
+ refresh-scheduler-pool-size = 2
+
instrument-factory {
# Default instrument settings for histograms and min max counters. The actual settings to be used when creating
diff --git a/kamon-core/src/main/scala/kamon/Diagnostic.scala b/kamon-core/src/main/scala/kamon/Diagnostic.scala
deleted file mode 100644
index 87784a72..00000000
--- a/kamon-core/src/main/scala/kamon/Diagnostic.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package kamon
-
-// The types are just an idea, they will need further refinement.
-trait Diagnostic {
- def isAspectJWorking: Boolean
- def detectedModules: Seq[String]
- def entityFilterPatterns: Seq[String]
- def metricSubscribers: Seq[String]
- def traceSubscribers: Seq[String]
-
- // Category Name => Count
- def entityCount: Map[String, Long]
-}
diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala
deleted file mode 100644
index 5f728cbd..00000000
--- a/kamon-core/src/main/scala/kamon/Environment.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package kamon
-
-import java.util.concurrent.ScheduledExecutorService
-
-import com.typesafe.config.Config
-
-trait Environment {
- def instance: String
- def host: String
- def application: String
- def config: Config
- def scheduler: ScheduledExecutorService
-}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 317920f4..b318d59d 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -1,68 +1,48 @@
package kamon
+import java.util.concurrent.atomic.AtomicReference
+
import com.typesafe.config.{Config, ConfigFactory}
import kamon.metric.{RecorderRegistry, RecorderRegistryImpl}
import kamon.trace.Tracer
-/**
- * The main entry point to all Kamon functionality.
- *
- *
- *
- *
- */
-object Kamon {
- private val recorderRegistry = new RecorderRegistryImpl(ConfigFactory.load())
- private val reporterRegistry = new ReporterRegistryImpl(recorderRegistry, ConfigFactory.load())
- private val kamonTracer = new Tracer(recorderRegistry, reporterRegistry)
-
- def tracer: io.opentracing.Tracer = kamonTracer
- def metrics: RecorderRegistry = recorderRegistry
- def reporters: ReporterRegistry = reporterRegistry
-
- def reconfigure(config: Config): Unit = synchronized {
- recorderRegistry.reconfigure(config)
- reporterRegistry.reconfigure(config)
- }
-
- def environment: Environment = ???
- def diagnose: Diagnostic = ???
- def util: Util = ???
-}
-
-
-/*
-
-Kamon.metrics.getRecorder("app-metrics")
-Kamon.metrics.getRecorder("akka-actor", "test")
+object Kamon {
+ private val _initialConfig = ConfigFactory.load()
+ private val _recorderRegistry = new RecorderRegistryImpl(_initialConfig)
+ private val _reporterRegistry = new ReporterRegistryImpl(_recorderRegistry, _initialConfig)
+ private val _tracer = new Tracer(_recorderRegistry, _reporterRegistry)
+ private val _environment = new AtomicReference[Environment](environmentFromConfig(ConfigFactory.load()))
-Kamon.entities.get("akka-actor", "test")
-Kamon.entities.remove(entity)
+ def tracer: io.opentracing.Tracer =
+ _tracer
-Kamon.util.entityFilters.accept(entity)
-Kamon.util.clock.
+ def metrics: RecorderRegistry =
+ _recorderRegistry
-Kamon.entities.new().
+ def reporters: ReporterRegistry =
+ _reporterRegistry
-Kamon.subscriptions.loadFromConfig()
-Kamon.subscriptions.subscribe(StatsD, Filters.IncludeAll)
-Kamon.subscriptions.subscribe(NewRelic, Filters.Empty().includeCategory("span").withTag("span.kind", "server"))
+ def environment: Environment =
+ _environment.get()
+ def reconfigure(config: Config): Unit = synchronized {
+ _recorderRegistry.reconfigure(config)
+ _reporterRegistry.reconfigure(config)
+ _environment.set(environmentFromConfig(config))
+ }
-Things that you need to do with Kamon:
-Global:
- - Reconfigure
- - Get Diagnostic Data
-Metrics:
- - create entities
- - subscribe to metrics data
-Tracer:
- - Build Spans / Use ActiveSpanSource
- - subscribe to tracing data
- */
+ case class Environment(config: Config, application: String, host: String, instance: String)
+ private def environmentFromConfig(config: Config): Environment = {
+ val environmentConfig = config.getConfig("kamon.environment")
+ val application = environmentConfig.getString("application")
+ val host = environmentConfig.getString("host")
+ val instance = environmentConfig.getString("instance")
+ Environment(config, application, host, instance)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index b42c5abe..98bde946 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -134,21 +134,6 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config)
}(ExecutionContext.fromExecutor(registryExecutionContext))
}
- /**
- * Creates a thread factory that assigns the specified name to all created Threads.
- */
- private def threadFactory(name: String): ThreadFactory =
- new ThreadFactory {
- val defaultFactory = Executors.defaultThreadFactory()
-
- override def newThread(r: Runnable): Thread = {
- val thread = defaultFactory.newThread(r)
- thread.setName(name)
- thread
- }
- }
-
-
private case class ReporterEntry(
@volatile var isActive: Boolean = true,
id: Long,
diff --git a/kamon-core/src/main/scala/kamon/Util.scala b/kamon-core/src/main/scala/kamon/Util.scala
deleted file mode 100644
index c8efbdc0..00000000
--- a/kamon-core/src/main/scala/kamon/Util.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package kamon
-
-import kamon.metric.EntityFilter
-
-/**
- * Useful classes for Kamon and submodules.
- *
- */
-trait Util {
-
- /**
- * @return Currently configured entity filters.
- */
- def entityFilter: EntityFilter
-}
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala b/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala
index cf203609..77fbfc4b 100644
--- a/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala
+++ b/kamon-core/src/main/scala/kamon/metric/EntityFilter.scala
@@ -7,13 +7,13 @@ import com.typesafe.config.Config
object EntityFilter {
def fromConfig(config: Config): EntityFilter = {
val filtersConfig = config.getConfig("kamon.metric.filters")
- val acceptUnmatched = filtersConfig.getBoolean("accept-unmatched")
+ val acceptUnmatched = filtersConfig.getBoolean("accept-unmatched-categories")
- val perCategoryFilters = filtersConfig.firstLevelKeys.filter(_ != "accept-unmatched") map { category: String ⇒
+ val perCategoryFilters = filtersConfig.firstLevelKeys.filter(_ != "accept-unmatched-categories") map { category: String ⇒
val includes = readFilters(filtersConfig, s"$category.includes")
val excludes = readFilters(filtersConfig, s"$category.excludes")
- (category, new IncludeExcludeNameFilter(includes, excludes, acceptUnmatched))
+ (category, new IncludeExcludeNameFilter(includes, excludes))
} toMap
new EntityFilter(perCategoryFilters, acceptUnmatched)
@@ -49,9 +49,9 @@ trait NameFilter {
def accept(name: String): Boolean
}
-class IncludeExcludeNameFilter(includes: Seq[NameFilter], excludes: Seq[NameFilter], acceptUnmatched: Boolean) extends NameFilter {
+class IncludeExcludeNameFilter(includes: Seq[NameFilter], excludes: Seq[NameFilter]) extends NameFilter {
override def accept(name: String): Boolean =
- (includes.exists(_.accept(name)) || acceptUnmatched) && !excludes.exists(_.accept(name))
+ includes.exists(_.accept(name)) && !excludes.exists(_.accept(name))
}
class RegexNameFilter(path: String) extends NameFilter {
diff --git a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
index 8ce37082..ccdb463e 100644
--- a/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
+++ b/kamon-core/src/main/scala/kamon/metric/EntityRecorder.scala
@@ -1,11 +1,13 @@
package kamon.metric
import java.time.Duration
+import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
import kamon.metric.instrument._
import kamon.util.MeasurementUnit
import scala.collection.concurrent.TrieMap
+import scala.util.Try
trait EntityRecorder {
def histogram(name: String): Histogram
@@ -25,12 +27,11 @@ trait EntitySnapshotProducer {
def snapshot(): EntitySnapshot
}
+class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory, scheduler: ScheduledExecutorService)
+ extends EntityRecorder with EntitySnapshotProducer {
-
-
-class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory) extends EntityRecorder with EntitySnapshotProducer {
private val histograms = TrieMap.empty[String, Histogram with DistributionSnapshotInstrument]
- private val minMaxCounters = TrieMap.empty[String, MinMaxCounter with DistributionSnapshotInstrument]
+ private val minMaxCounters = TrieMap.empty[String, MinMaxCounterEntry]
private val counters = TrieMap.empty[String, Counter with SingleValueSnapshotInstrument]
private val gauges = TrieMap.empty[String, Gauge with SingleValueSnapshotInstrument]
@@ -41,10 +42,14 @@ class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory
histograms.atomicGetOrElseUpdate(name, instrumentFactory.buildHistogram(entity, name, dynamicRange, measurementUnit))
def minMaxCounter(name: String): MinMaxCounter =
- minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name))
+ minMaxCounters.atomicGetOrElseUpdate(name,
+ createMMCounterEntry(instrumentFactory.buildMinMaxCounter(entity, name))
+ ).mmCounter
def minMaxCounter(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, sampleFrequency: Duration): MinMaxCounter =
- minMaxCounters.atomicGetOrElseUpdate(name, instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit))
+ minMaxCounters.atomicGetOrElseUpdate(name,
+ createMMCounterEntry(instrumentFactory.buildMinMaxCounter(entity, name, dynamicRange, sampleFrequency, measurementUnit))
+ ).mmCounter
def gauge(name: String): Gauge =
gauges.atomicGetOrElseUpdate(name, instrumentFactory.buildGauge(entity, name))
@@ -62,8 +67,24 @@ class DefaultEntityRecorder(entity: Entity, instrumentFactory: InstrumentFactory
new EntitySnapshot(
entity,
histograms = histograms.values.map(_.snapshot()).toSeq,
- minMaxCounters = minMaxCounters.values.map(_.snapshot()).toSeq,
+ minMaxCounters = minMaxCounters.values.map(_.mmCounter.snapshot()).toSeq,
gauges = gauges.values.map(_.snapshot()).toSeq,
counters = counters.values.map(_.snapshot()).toSeq
)
+
+ def cleanup(): Unit = {
+ minMaxCounters.values.foreach { mmCounter =>
+ Try(mmCounter.refreshFuture.cancel(true))
+ }
+ }
+
+ private case class MinMaxCounterEntry(mmCounter: MinMaxCounter with DistributionSnapshotInstrument, refreshFuture: ScheduledFuture[_])
+
+ private def createMMCounterEntry(mmCounter: MinMaxCounter with DistributionSnapshotInstrument): MinMaxCounterEntry = {
+ val refreshFuture = scheduler.schedule(new Runnable {
+ override def run(): Unit = mmCounter.sample()
+ }, mmCounter.sampleInterval.toMillis, TimeUnit.MILLISECONDS)
+
+ MinMaxCounterEntry(mmCounter, refreshFuture)
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
index 53081760..fd728b1d 100644
--- a/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/metric/RecorderRegistry.scala
@@ -1,6 +1,7 @@
package kamon
package metric
+import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.Config
@@ -16,6 +17,7 @@ trait RecorderRegistry {
}
class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry {
+ private val scheduler = new ScheduledThreadPoolExecutor(1, numberedThreadFactory("kamon.metric.refresh-scheduler"))
private val instrumentFactory = new AtomicReference[InstrumentFactory]()
private val entityFilter = new AtomicReference[EntityFilter]()
private val entities = TrieMap.empty[Entity, EntityRecorder with EntitySnapshotProducer]
@@ -27,7 +29,7 @@ class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry {
entityFilter.get().accept(entity)
override def getRecorder(entity: Entity): EntityRecorder =
- entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get()))
+ entities.atomicGetOrElseUpdate(entity, new DefaultEntityRecorder(entity, instrumentFactory.get(), scheduler))
override def removeRecorder(entity: Entity): Boolean =
entities.remove(entity).nonEmpty
@@ -35,13 +37,20 @@ class RecorderRegistryImpl(initialConfig: Config) extends RecorderRegistry {
private[kamon] def reconfigure(config: Config): Unit = synchronized {
instrumentFactory.set(InstrumentFactory.fromConfig(config))
entityFilter.set(EntityFilter.fromConfig(config))
+
+ val refreshSchedulerPoolSize = config.getInt("kamon.metric.refresh-scheduler-pool-size")
+ scheduler.setCorePoolSize(refreshSchedulerPoolSize)
}
private[kamon] def snapshot(): Seq[EntitySnapshot] = {
entities.values.map(_.snapshot()).toSeq
}
+
+ //private[kamon] def diagnosticData
}
+case class RecorderRegistryDiagnostic(entities: Seq[Entity])
+
diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala
index 92c48017..b65022bc 100644
--- a/kamon-core/src/main/scala/kamon/package.scala
+++ b/kamon-core/src/main/scala/kamon/package.scala
@@ -1,3 +1,6 @@
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{Executors, ThreadFactory}
+
import com.typesafe.config.Config
import scala.collection.concurrent.TrieMap
@@ -7,6 +10,33 @@ package object kamon {
/**
+ * Creates a thread factory that assigns the specified name to all created Threads.
+ */
+ def threadFactory(name: String): ThreadFactory =
+ new ThreadFactory {
+ val defaultFactory = Executors.defaultThreadFactory()
+
+ override def newThread(r: Runnable): Thread = {
+ val thread = defaultFactory.newThread(r)
+ thread.setName(name)
+ thread
+ }
+ }
+
+ def numberedThreadFactory(name: String): ThreadFactory =
+ new ThreadFactory {
+ val count = new AtomicLong()
+ val defaultFactory = Executors.defaultThreadFactory()
+
+ override def newThread(r: Runnable): Thread = {
+ val thread = defaultFactory.newThread(r)
+ thread.setName(name + "-" + count.incrementAndGet().toString)
+ thread
+ }
+ }
+
+
+ /**
* Workaround to the non thread-safe [[scala.collection.concurrent.TrieMap#getOrElseUpdate()]] method. More details on
* why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]].
*/
diff --git a/kamon-core/src/main/scala/kamon/util/Scheduler.scala b/kamon-core/src/main/scala/kamon/util/Scheduler.scala
deleted file mode 100644
index 0bc86f9a..00000000
--- a/kamon-core/src/main/scala/kamon/util/Scheduler.scala
+++ /dev/null
@@ -1,5 +0,0 @@
-package kamon.util
-
-trait Scheduler {
-
-}
diff --git a/kamon-core/src/test/scala/kamon/metric/EntityFilterSpec.scala b/kamon-core/src/test/scala/kamon/metric/EntityFilterSpec.scala
index 15dfc5ff..16481ccd 100644
--- a/kamon-core/src/test/scala/kamon/metric/EntityFilterSpec.scala
+++ b/kamon-core/src/test/scala/kamon/metric/EntityFilterSpec.scala
@@ -9,7 +9,7 @@ class EntityFilterSpec extends WordSpec with Matchers {
val testConfig = ConfigFactory.parseString(
"""
|kamon.metric.filters {
- | accept-unmatched = false
+ | accept-unmatched-categories = false
|
| some-category {
| includes = ["**"]
@@ -32,9 +32,9 @@ class EntityFilterSpec extends WordSpec with Matchers {
)
"the entity filters" should {
- "use the accept-unmatched setting when there is no configuration for a given category" in {
- val acceptUnmatched = EntityFilter.fromConfig(ConfigFactory.parseString("kamon.metric.filters.accept-unmatched=true"))
- val rejectUnmatched = EntityFilter.fromConfig(ConfigFactory.parseString("kamon.metric.filters.accept-unmatched=false"))
+ "use the accept-unmatched-categories setting when there is no configuration for a given category" in {
+ val acceptUnmatched = EntityFilter.fromConfig(ConfigFactory.parseString("kamon.metric.filters.accept-unmatched-categories=true"))
+ val rejectUnmatched = EntityFilter.fromConfig(ConfigFactory.parseString("kamon.metric.filters.accept-unmatched-categories=false"))
acceptUnmatched.accept(Entity("a", "b", Map.empty)) shouldBe true
rejectUnmatched.accept(Entity("a", "b", Map.empty)) shouldBe false