diff options
Diffstat (limited to 'kamon-core/src')
-rw-r--r-- | kamon-core/src/main/resources/reference.conf | 5 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/Kamon.scala | 8 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/ReporterRegistry.scala | 32 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/util/Filters.scala | 6 |
4 files changed, 44 insertions, 7 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index af767f60..b9254018 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -14,8 +14,9 @@ kamon { } # FQCN of the reporter instances that should be loaded when calling `Kamon.reporters.loadFromConfig()`. All reporter - # classes must - # Example: `reporters = ["kamon.statsd.StatsD", "kamon.zipkin.Zipkin"]`. + # classes must have a default constructor. No metric filtering is applied to metric reporters started this way. + + # Example: `reporters = ["kamon.prometheus.Prometheus", "kamon.zipkin.Zipkin"]`. reporters = [ ] # Pool size for the executor service that will run sampling on RangeSampler instruments. This scheduler is accesible diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 7ea9fa64..562ef615 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -22,7 +22,7 @@ import com.typesafe.config.{Config, ConfigFactory} import kamon.context.{Codecs, Context, Key, Storage} import kamon.metric._ import kamon.trace._ -import kamon.util.{Filters, Registration, Clock} +import kamon.util.{Clock, Filters, Matcher, Registration} import org.slf4j.LoggerFactory import scala.concurrent.Future @@ -152,6 +152,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def addReporter(reporter: MetricReporter, name: String): Registration = _reporterRegistry.addReporter(reporter, name) + override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration = + _reporterRegistry.addReporter(reporter, name, filter) + override def addReporter(reporter: SpanReporter): Registration = _reporterRegistry.addReporter(reporter) @@ -164,6 +167,9 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { def filter(filterName: String, pattern: String): Boolean = _filters.accept(filterName, pattern) + def filter(filterName: String): Matcher = + _filters.get(filterName) + def clock(): Clock = _clock diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 67cfbe4e..bd28e871 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -52,6 +52,7 @@ trait ReporterRegistry { def addReporter(reporter: MetricReporter): Registration def addReporter(reporter: MetricReporter, name: String): Registration + def addReporter(reporter: MetricReporter, name: String, filter: String): Registration def addReporter(reporter: SpanReporter): Registration def addReporter(reporter: SpanReporter, name: String): Registration @@ -106,6 +107,9 @@ object ReporterRegistry { override def addReporter(reporter: MetricReporter, name: String): Registration = addMetricReporter(reporter, name) + override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration = + addMetricReporter(reporter, name, Some(filter)) + override def addReporter(reporter: SpanReporter): Registration = addSpanReporter(reporter, reporter.getClass.getName()) @@ -113,12 +117,13 @@ object ReporterRegistry { addSpanReporter(reporter, name) - private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { + private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized { val executor = Executors.newSingleThreadExecutor(threadFactory(name)) val reporterEntry = new MetricReporterEntry( id = reporterCounter.getAndIncrement(), name = name, reporter = reporter, + filter = filter, executionContext = ExecutionContext.fromExecutorService(executor) ) @@ -299,6 +304,7 @@ object ReporterRegistry { val id: Long, val name: String, val reporter: MetricReporter, + val filter: Option[String], val executionContext: ExecutionContextExecutorService ) @@ -321,7 +327,7 @@ object ReporterRegistry { def run(): Unit = try { val currentInstant = Instant.now(clock) - val tickSnapshot = PeriodSnapshot( + val periodSnapshot = PeriodSnapshot( from = lastInstant, to = currentInstant, metrics = snapshotGenerator.snapshot() @@ -330,8 +336,13 @@ object ReporterRegistry { reporterEntries.foreach { case (_, entry) => Future { Try { - if (entry.isActive) - entry.reporter.reportPeriodSnapshot(tickSnapshot) + if (entry.isActive) { + val filteredSnapshot = entry.filter + .map(f => filterMetrics(f, periodSnapshot)) + .getOrElse(periodSnapshot) + + entry.reporter.reportPeriodSnapshot(filteredSnapshot) + } }.failed.foreach { error => logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) @@ -345,8 +356,21 @@ object ReporterRegistry { } catch { case NonFatal(t) => logger.error("Error while running a tick", t) } + + private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = { + val metricFilter = Kamon.filter(filterName) + val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name)) + val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name)) + val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name)) + val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name)) + + periodSnapshot.copy(metrics = MetricsSnapshot( + histograms, rangeSamplers, gauges, counters + )) + } } + private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { override def run(): Unit = { spanReporters.foreach { diff --git a/kamon-core/src/main/scala/kamon/util/Filters.scala b/kamon-core/src/main/scala/kamon/util/Filters.scala index 9f015e99..a6c4de88 100644 --- a/kamon-core/src/main/scala/kamon/util/Filters.scala +++ b/kamon-core/src/main/scala/kamon/util/Filters.scala @@ -60,6 +60,12 @@ class Filters(filters: Map[String, Matcher]) { .get(filterName) .map(_.accept(pattern)) .getOrElse(false) + + def get(filterName: String): Matcher = { + filters.getOrElse(filterName, new Matcher { + override def accept(name: String): Boolean = false + }) + } } trait Matcher { |