From a02215f4327fc11ed8e5066ada22c1d5938d849e Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 17 Jan 2018 02:14:27 +0100 Subject: allow metric filtering when adding a metrics reporter --- .../test/scala/kamon/ReporterRegistrySpec.scala | 114 +++++++++++++++++++++ kamon-core/src/main/resources/reference.conf | 5 +- kamon-core/src/main/scala/kamon/Kamon.scala | 8 +- .../src/main/scala/kamon/ReporterRegistry.scala | 32 +++++- kamon-core/src/main/scala/kamon/util/Filters.scala | 6 ++ .../src/main/scala/kamon/testkit/Reconfigure.scala | 8 ++ 6 files changed, 166 insertions(+), 7 deletions(-) create mode 100644 kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala diff --git a/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala b/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala new file mode 100644 index 00000000..d17c0f09 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/ReporterRegistrySpec.scala @@ -0,0 +1,114 @@ +package kamon + +import com.typesafe.config.Config +import kamon.metric.PeriodSnapshot +import kamon.testkit.Reconfigure +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} + +class ReporterRegistrySpec extends WordSpec with Matchers with Reconfigure with Eventually with BeforeAndAfterAll { + "The ReporterRegistry" when { + "working with metrics reporters" should { + "report all metrics if no filters are applied" in { + Kamon.counter("test.hello").increment() + Kamon.counter("test.world").increment() + Kamon.counter("other.hello").increment() + + val reporter = new SeenMetricsReporter() + val subscription = Kamon.addReporter(reporter, "reporter-registry-spec") + + eventually { + reporter.snapshotCount() should be >= 1 + reporter.metrics() should contain allOf( + "test.hello", + "test.world", + "other.hello" + ) + } + + subscription.cancel() + } + + "default to deny all metrics if a provided filter name doesn't exist" in { + Kamon.counter("test.hello").increment() + Kamon.counter("test.world").increment() + Kamon.counter("other.hello").increment() + + val reporter = new SeenMetricsReporter() + val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "does-not-exist") + + eventually { + reporter.snapshotCount() should be >= 1 + reporter.metrics() shouldBe empty + } + + subscription.cancel() + } + + "apply existent filters" in { + Kamon.counter("test.hello").increment() + Kamon.counter("test.world").increment() + Kamon.counter("other.hello").increment() + + val reporter = new SeenMetricsReporter() + val subscription = Kamon.addReporter(reporter, "reporter-registry-spec", "real-filter") + + eventually { + reporter.snapshotCount() should be >= 1 + reporter.metrics() should contain allOf( + "test.hello", + "test.world" + ) + } + + subscription.cancel() + } + } + } + + + override protected def beforeAll(): Unit = { + applyConfig( + """ + |kamon { + | metric.tick-interval = 10 millis + | + | util.filters { + | real-filter { + | includes = [ "test**" ] + | } + | } + |} + | + """.stripMargin + ) + } + + + override protected def afterAll(): Unit = { + resetConfig() + } + + abstract class DummyReporter extends MetricReporter { + override def start(): Unit = {} + override def stop(): Unit = {} + override def reconfigure(config: Config): Unit = {} + } + + class SeenMetricsReporter extends DummyReporter { + @volatile private var count = 0 + @volatile private var seenMetrics = Seq.empty[String] + + override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { + import snapshot.metrics._ + count += 1 + seenMetrics = counters.map(_.name) ++ histograms.map(_.name) ++ gauges.map(_.name) ++ rangeSamplers.map(_.name) + } + + def metrics(): Seq[String] = + seenMetrics + + def snapshotCount(): Int = + count + } +} diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index af767f60..b9254018 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -14,8 +14,9 @@ kamon { } # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`. All reporter - # classes must - # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`. + # classes must have a default constructor. No metric filtering is applied to metric reporters started this way. + + # Example: `reporters = ["kamon.prometheus.Prometheus", "kamon.zipkin.Zipkin"]`. reporters = [ ] # Pool size for the executor service that will run sampling on RangeSampler instruments. This scheduler is accesible diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 7ea9fa64..562ef615 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -22,7 +22,7 @@ import com.typesafe.config.{Config, ConfigFactory} import kamon.context.{Codecs, Context, Key, Storage} import kamon.metric._ import kamon.trace._ -import kamon.util.{Filters, Registration, Clock} +import kamon.util.{Clock, Filters, Matcher, Registration} import org.slf4j.LoggerFactory import scala.concurrent.Future @@ -152,6 +152,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def addReporter(reporter: MetricReporter, name: String): Registration = _reporterRegistry.addReporter(reporter, name) + override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration = + _reporterRegistry.addReporter(reporter, name, filter) + override def addReporter(reporter: SpanReporter): Registration = _reporterRegistry.addReporter(reporter) @@ -164,6 +167,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { def filter(filterName: String, pattern: String): Boolean = _filters.accept(filterName, pattern) + def filter(filterName: String): Matcher = + _filters.get(filterName) + def clock(): Clock = _clock diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 67cfbe4e..bd28e871 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -52,6 +52,7 @@ trait ReporterRegistry { def addReporter(reporter: MetricReporter): Registration def addReporter(reporter: MetricReporter, name: String): Registration + def addReporter(reporter: MetricReporter, name: String, filter: String): Registration def addReporter(reporter: SpanReporter): Registration def addReporter(reporter: SpanReporter, name: String): Registration @@ -106,6 +107,9 @@ object ReporterRegistry { override def addReporter(reporter: MetricReporter, name: String): Registration = addMetricReporter(reporter, name) + override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration = + addMetricReporter(reporter, name, Some(filter)) + override def addReporter(reporter: SpanReporter): Registration = addSpanReporter(reporter, reporter.getClass.getName()) @@ -113,12 +117,13 @@ object ReporterRegistry { addSpanReporter(reporter, name) - private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { + private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) val reporterEntry = new MetricReporterEntry( id = reporterCounter.getAndIncrement(), name = name, reporter = reporter, + filter = filter, executionContext = ExecutionContext.fromExecutorService(executor) ) @@ -299,6 +304,7 @@ object ReporterRegistry { val id: Long, val name: String, val reporter: MetricReporter, + val filter: Option[String], val executionContext: ExecutionContextExecutorService ) @@ -321,7 +327,7 @@ object ReporterRegistry { def run(): Unit = try { val currentInstant = Instant.now(clock) - val tickSnapshot = PeriodSnapshot( + val periodSnapshot = PeriodSnapshot( from = lastInstant, to = currentInstant, metrics = snapshotGenerator.snapshot() @@ -330,8 +336,13 @@ object ReporterRegistry { reporterEntries.foreach { case (_, entry) => Future { Try { - if (entry.isActive) - entry.reporter.reportPeriodSnapshot(tickSnapshot) + if (entry.isActive) { + val filteredSnapshot = entry.filter + .map(f => filterMetrics(f, periodSnapshot)) + .getOrElse(periodSnapshot) + + entry.reporter.reportPeriodSnapshot(filteredSnapshot) + } }.failed.foreach { error => logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) @@ -345,8 +356,21 @@ object ReporterRegistry { } catch { case NonFatal(t) => logger.error("Error while running a tick", t) } + + private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = { + val metricFilter = Kamon.filter(filterName) + val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name)) + val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name)) + val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name)) + val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name)) + + periodSnapshot.copy(metrics = MetricsSnapshot( + histograms, rangeSamplers, gauges, counters + )) + } } + private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { override def run(): Unit = { spanReporters.foreach { diff --git a/kamon-core/src/main/scala/kamon/util/Filters.scala b/kamon-core/src/main/scala/kamon/util/Filters.scala index 9f015e99..a6c4de88 100644 --- a/kamon-core/src/main/scala/kamon/util/Filters.scala +++ b/kamon-core/src/main/scala/kamon/util/Filters.scala @@ -60,6 +60,12 @@ class Filters(filters: Map[String, Matcher]) { .get(filterName) .map(_.accept(pattern)) .getOrElse(false) + + def get(filterName: String): Matcher = { + filters.getOrElse(filterName, new Matcher { + override def accept(name: String): Boolean = false + }) + } } trait Matcher { diff --git a/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala b/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala index 98dd47ae..c70fadb8 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/Reconfigure.scala @@ -20,6 +20,10 @@ import kamon.Kamon trait Reconfigure { + def enableFastMetricFlushing(): Unit = { + applyConfig("kamon.metric.tick-interval = 1 millisecond") + } + def enableFastSpanFlushing(): Unit = { applyConfig("kamon.trace.tick-interval = 1 millisecond") } @@ -44,6 +48,10 @@ trait Reconfigure { Kamon.reconfigure(ConfigFactory.parseString(configString).withFallback(Kamon.config())) } + def resetConfig(): Unit = { + Kamon.reconfigure(ConfigFactory.load()) + } + } -- cgit v1.2.3