diff options
authorIvan Topolnjak <ivantopo@gmail.com>2018-01-17 02:14:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2018-01-17 02:14:27 +0100
commita02215f4327fc11ed8e5066ada22c1d5938d849e (patch)
parented6504f67549184c86e3b6fe752985179912f35c (diff)
allow metric filtering when adding a metrics reporter
6 files changed, 166 insertions, 7 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala b/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala
new file mode 100644
index 00000000..d17c0f09
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala
@@ -0,0 +1,114 @@
+package kamon
+import com.typesafe.config.Config
+import kamon.metric.PeriodSnapshot
+import kamon.testkit.Reconfigure
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+class ReporterRegistrySpec extends WordSpec with Matchers with Reconfigure with Eventually with BeforeAndAfterAll {
+ "The ReporterRegistry" when {
+ "working with metrics reporters" should {
+ "report all metrics if no filters are applied" in {
+ Kamon.counter("test.hello").increment()
+ Kamon.counter("test.world").increment()
+ Kamon.counter("other.hello").increment()
+ val reporter = new SeenMetricsReporter()
+ val subscription = Kamon.addReporter(reporter, "reporter-registry-spec")
+ eventually {
+ reporter.snapshotCount() should be >= 1
+ reporter.metrics() should contain allOf(
+ "test.hello",
+ "test.world",
+ "other.hello"
+ )
+ }
+ subscription.cancel()
+ }
+ "default to deny all metrics if a provided filter name doesn't exist" in {
+ Kamon.counter("test.hello").increment()
+ Kamon.counter("test.world").increment()
+ Kamon.counter("other.hello").increment()
+ val reporter = new SeenMetricsReporter()
+ val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "does-not-exist")
+ eventually {
+ reporter.snapshotCount() should be >= 1
+ reporter.metrics() shouldBe empty
+ }
+ subscription.cancel()
+ }
+ "apply existent filters" in {
+ Kamon.counter("test.hello").increment()
+ Kamon.counter("test.world").increment()
+ Kamon.counter("other.hello").increment()
+ val reporter = new SeenMetricsReporter()
+ val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "real-filter")
+ eventually {
+ reporter.snapshotCount() should be >= 1
+ reporter.metrics() should contain allOf(
+ "test.hello",
+ "test.world"
+ )
+ }
+ subscription.cancel()
+ }
+ }
+ }
+ override protected def beforeAll(): Unit = {
+ applyConfig(
+ """
+ |kamon {
+ | metric.tick-interval = 10 millis
+ |
+ | util.filters {
+ | real-filter {
+ | includes = [ "test**" ]
+ | }
+ | }
+ |}
+ |
+ """.stripMargin
+ )
+ }
+ override protected def afterAll(): Unit = {
+ resetConfig()
+ }
+ abstract class DummyReporter extends MetricReporter {
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def reconfigure(config: Config): Unit = {}
+ }
+ class SeenMetricsReporter extends DummyReporter {
+ @volatile private var count = 0
+ @volatile private var seenMetrics = Seq.empty[String]
+ override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
+ import snapshot.metrics._
+ count += 1
+ seenMetrics = counters.map(_.name) ++ histograms.map(_.name) ++ gauges.map(_.name) ++ rangeSamplers.map(_.name)
+ }
+ def metrics(): Seq[String] =
+ seenMetrics
+ def snapshotCount(): Int =
+ count
+ }
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index af767f60..b9254018 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -14,8 +14,9 @@ kamon {
# FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`. All reporter
- # classes must
- # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`.
+ # classes must have a default constructor. No metric filtering is applied to metric reporters started this way.
+ # Example: `reporters = ["kamon.prometheus.Prometheus", "kamon.zipkin.Zipkin"]`.
reporters = [ ]
# Pool size for the executor service that will run sampling on RangeSampler instruments. This scheduler is accesible
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 7ea9fa64..562ef615 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -22,7 +22,7 @@ import com.typesafe.config.{Config, ConfigFactory}
import kamon.context.{Codecs, Context, Key, Storage}
import kamon.metric._
import kamon.trace._
-import kamon.util.{Filters, Registration, Clock}
+import kamon.util.{Clock, Filters, Matcher, Registration}
import org.slf4j.LoggerFactory
import scala.concurrent.Future
@@ -152,6 +152,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
override def addReporter(reporter: MetricReporter, name: String): Registration =
_reporterRegistry.addReporter(reporter, name)
+ override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
+ _reporterRegistry.addReporter(reporter, name, filter)
override def addReporter(reporter: SpanReporter): Registration =
@@ -164,6 +167,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
def filter(filterName: String, pattern: String): Boolean =
_filters.accept(filterName, pattern)
+ def filter(filterName: String): Matcher =
+ _filters.get(filterName)
def clock(): Clock =
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 67cfbe4e..bd28e871 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -52,6 +52,7 @@ trait ReporterRegistry {
def addReporter(reporter: MetricReporter): Registration
def addReporter(reporter: MetricReporter, name: String): Registration
+ def addReporter(reporter: MetricReporter, name: String, filter: String): Registration
def addReporter(reporter: SpanReporter): Registration
def addReporter(reporter: SpanReporter, name: String): Registration
@@ -106,6 +107,9 @@ object ReporterRegistry {
override def addReporter(reporter: MetricReporter, name: String): Registration =
addMetricReporter(reporter, name)
+ override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
+ addMetricReporter(reporter, name, Some(filter))
override def addReporter(reporter: SpanReporter): Registration =
addSpanReporter(reporter, reporter.getClass.getName())
@@ -113,12 +117,13 @@ object ReporterRegistry {
addSpanReporter(reporter, name)
- private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized {
+ private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized {
val executor = Executors.newSingleThreadExecutor(threadFactory(name))
val reporterEntry = new MetricReporterEntry(
id = reporterCounter.getAndIncrement(),
name = name,
reporter = reporter,
+ filter = filter,
executionContext = ExecutionContext.fromExecutorService(executor)
@@ -299,6 +304,7 @@ object ReporterRegistry {
val id: Long,
val name: String,
val reporter: MetricReporter,
+ val filter: Option[String],
val executionContext: ExecutionContextExecutorService
@@ -321,7 +327,7 @@ object ReporterRegistry {
def run(): Unit = try {
val currentInstant = Instant.now(clock)
- val tickSnapshot = PeriodSnapshot(
+ val periodSnapshot = PeriodSnapshot(
from = lastInstant,
to = currentInstant,
metrics = snapshotGenerator.snapshot()
@@ -330,8 +336,13 @@ object ReporterRegistry {
reporterEntries.foreach { case (_, entry) =>
Future {
Try {
- if (entry.isActive)
- entry.reporter.reportPeriodSnapshot(tickSnapshot)
+ if (entry.isActive) {
+ val filteredSnapshot = entry.filter
+ .map(f => filterMetrics(f, periodSnapshot))
+ .getOrElse(periodSnapshot)
+ entry.reporter.reportPeriodSnapshot(filteredSnapshot)
+ }
}.failed.foreach { error =>
logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
@@ -345,8 +356,21 @@ object ReporterRegistry {
} catch {
case NonFatal(t) => logger.error("Error while running a tick", t)
+ private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = {
+ val metricFilter = Kamon.filter(filterName)
+ val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name))
+ val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name))
+ val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name))
+ val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name))
+ periodSnapshot.copy(metrics = MetricsSnapshot(
+ histograms, rangeSamplers, gauges, counters
+ ))
+ }
private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
override def run(): Unit = {
spanReporters.foreach {
diff --git a/kamon-core/src/main/scala/kamon/util/Filters.scala b/kamon-core/src/main/scala/kamon/util/Filters.scala
index 9f015e99..a6c4de88 100644
--- a/kamon-core/src/main/scala/kamon/util/Filters.scala
+++ b/kamon-core/src/main/scala/kamon/util/Filters.scala
@@ -60,6 +60,12 @@ class Filters(filters: Map[String, Matcher]) {
+ def get(filterName: String): Matcher = {
+ filters.getOrElse(filterName, new Matcher {
+ override def accept(name: String): Boolean = false
+ })
+ }
trait Matcher {
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala b/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala
index 98dd47ae..c70fadb8 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala
@@ -20,6 +20,10 @@ import kamon.Kamon
trait Reconfigure {
+ def enableFastMetricFlushing(): Unit = {
+ applyConfig("kamon.metric.tick-interval = 1 millisecond")
+ }
def enableFastSpanFlushing(): Unit = {
applyConfig("kamon.trace.tick-interval = 1 millisecond")
@@ -44,6 +48,10 @@ trait Reconfigure {
+ def resetConfig(): Unit = {
+ Kamon.reconfigure(ConfigFactory.load())
+ }