aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-01-17 02:14:27 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2018-01-17 02:14:27 +0100
commita02215f4327fc11ed8e5066ada22c1d5938d849e (patch)
tree90f1a61535b9cad97091217cb04b9c621c854fd3 /kamon-core/src/main/scala/kamon/ReporterRegistry.scala
parented6504f67549184c86e3b6fe752985179912f35c (diff)
downloadKamon-a02215f4327fc11ed8e5066ada22c1d5938d849e.tar.gz
Kamon-a02215f4327fc11ed8e5066ada22c1d5938d849e.tar.bz2
Kamon-a02215f4327fc11ed8e5066ada22c1d5938d849e.zip
allow metric filtering when adding a metrics reporter
Diffstat (limited to 'kamon-core/src/main/scala/kamon/ReporterRegistry.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala32
1 files changed, 28 insertions, 4 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 67cfbe4e..bd28e871 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -52,6 +52,7 @@ trait ReporterRegistry {
def addReporter(reporter: MetricReporter): Registration
def addReporter(reporter: MetricReporter, name: String): Registration
+ def addReporter(reporter: MetricReporter, name: String, filter: String): Registration
def addReporter(reporter: SpanReporter): Registration
def addReporter(reporter: SpanReporter, name: String): Registration
@@ -106,6 +107,9 @@ object ReporterRegistry {
override def addReporter(reporter: MetricReporter, name: String): Registration =
addMetricReporter(reporter, name)
+ override def addReporter(reporter: MetricReporter, name: String, filter: String): Registration =
+ addMetricReporter(reporter, name, Some(filter))
+
override def addReporter(reporter: SpanReporter): Registration =
addSpanReporter(reporter, reporter.getClass.getName())
@@ -113,12 +117,13 @@ object ReporterRegistry {
addSpanReporter(reporter, name)
- private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized {
+ private def addMetricReporter(reporter: MetricReporter, name: String, filter: Option[String] = None): Registration = synchronized {
val executor = Executors.newSingleThreadExecutor(threadFactory(name))
val reporterEntry = new MetricReporterEntry(
id = reporterCounter.getAndIncrement(),
name = name,
reporter = reporter,
+ filter = filter,
executionContext = ExecutionContext.fromExecutorService(executor)
)
@@ -299,6 +304,7 @@ object ReporterRegistry {
val id: Long,
val name: String,
val reporter: MetricReporter,
+ val filter: Option[String],
val executionContext: ExecutionContextExecutorService
)
@@ -321,7 +327,7 @@ object ReporterRegistry {
def run(): Unit = try {
val currentInstant = Instant.now(clock)
- val tickSnapshot = PeriodSnapshot(
+ val periodSnapshot = PeriodSnapshot(
from = lastInstant,
to = currentInstant,
metrics = snapshotGenerator.snapshot()
@@ -330,8 +336,13 @@ object ReporterRegistry {
reporterEntries.foreach { case (_, entry) =>
Future {
Try {
- if (entry.isActive)
- entry.reporter.reportPeriodSnapshot(tickSnapshot)
+ if (entry.isActive) {
+ val filteredSnapshot = entry.filter
+ .map(f => filterMetrics(f, periodSnapshot))
+ .getOrElse(periodSnapshot)
+
+ entry.reporter.reportPeriodSnapshot(filteredSnapshot)
+ }
}.failed.foreach { error =>
logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
@@ -345,8 +356,21 @@ object ReporterRegistry {
} catch {
case NonFatal(t) => logger.error("Error while running a tick", t)
}
+
+ private def filterMetrics(filterName: String, periodSnapshot: PeriodSnapshot): PeriodSnapshot = {
+ val metricFilter = Kamon.filter(filterName)
+ val counters = periodSnapshot.metrics.counters.filter(c => metricFilter.accept(c.name))
+ val gauges = periodSnapshot.metrics.gauges.filter(g => metricFilter.accept(g.name))
+ val histograms = periodSnapshot.metrics.histograms.filter(h => metricFilter.accept(h.name))
+ val rangeSamplers = periodSnapshot.metrics.rangeSamplers.filter(rs => metricFilter.accept(rs.name))
+
+ periodSnapshot.copy(metrics = MetricsSnapshot(
+ histograms, rangeSamplers, gauges, counters
+ ))
+ }
}
+
private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
override def run(): Unit = {
spanReporters.foreach {