aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core-tests/src/test/scala/kamon/util/DifferentialSourceSpec.scala40
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala63
-rw-r--r--kamon-core/src/main/scala/kamon/util/DifferentialSource.scala24
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala23
4 files changed, 141 insertions, 9 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/util/DifferentialSourceSpec.scala b/kamon-core-tests/src/test/scala/kamon/util/DifferentialSourceSpec.scala
new file mode 100644
index 00000000..0a2f83e5
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/util/DifferentialSourceSpec.scala
@@ -0,0 +1,40 @@
+package kamon.util
+
+import org.scalatest.{Matchers, WordSpec}
+
+class DifferentialSourceSpec extends WordSpec with Matchers {
+
+ "a Differential Source" should {
+ "get the difference between the last two observations" in {
+ val source = sourceOf(0, 0, 1, 1, 2, 3, 4, 6, 8, 10, 12, 16, 18)
+ val expectedDiffs = Seq(0, 1, 0, 1, 1, 1, 2, 2, 2, 2, 4, 2)
+
+ values(expectedDiffs.length, source) should contain theSameElementsInOrderAs(expectedDiffs)
+ }
+
+ "ignore decrements in observations" in {
+ val source = sourceOf(10, 10, 5, 5, 10, 10)
+ val expectedDiffs = Seq(0, 0, 0, 5, 0)
+
+ values(expectedDiffs.length, source) should contain theSameElementsInOrderAs(expectedDiffs)
+ }
+ }
+
+ def sourceOf(numbers: Long*): DifferentialSource = DifferentialSource(new (() => Long) {
+ var remaining = numbers.toList
+
+ override def apply(): Long = {
+ if(remaining.isEmpty) 0 else {
+ val head = remaining.head
+ remaining = remaining.tail
+ head
+ }
+ }
+ })
+
+ def values(count: Int, source: DifferentialSource): Seq[Long] = {
+ for(_ <- 1 to count) yield source.get()
+ }
+}
+
+
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index eedec830..7d05ab92 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -122,7 +122,13 @@ object ReporterRegistry {
executionContext = ExecutionContext.fromExecutorService(executor)
)
- Future(reporterEntry.reporter.start())(reporterEntry.executionContext)
+ Future {
+ Try {
+ reporterEntry.reporter.start()
+ }.failed.foreach { error =>
+ logger.error(s"Metric reporter [$name] failed to start.", error)
+ }
+ }(reporterEntry.executionContext)
if(metricReporters.isEmpty)
reStartMetricTicker()
@@ -142,7 +148,13 @@ object ReporterRegistry {
executionContext = ExecutionContext.fromExecutorService(executor)
)
- Future(reporterEntry.reporter.start())(reporterEntry.executionContext)
+ Future {
+ Try {
+ reporterEntry.reporter.start()
+ }.failed.foreach { error =>
+ logger.error(s"Span reporter [$name] failed to start.", error)
+ }
+ }(reporterEntry.executionContext)
if(spanReporters.isEmpty)
reStartTraceTicker()
@@ -174,7 +186,7 @@ object ReporterRegistry {
}
}
- Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit))
+ Future.sequence(reporterStopFutures.result()).map(_ => ())
}
private[kamon] def reconfigure(config: Config): Unit = synchronized {
@@ -187,8 +199,27 @@ object ReporterRegistry {
reStartTraceTicker()
// Reconfigure all registered reporters
- metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
- spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
+ metricReporters.foreach {
+ case (_, entry) =>
+ Future {
+ Try {
+ entry.reporter.reconfigure(config)
+ }.failed.foreach { error =>
+ logger.error(s"Metric reporter [${entry.name}] failed to reconfigure.", error)
+ }
+ }(entry.executionContext)
+ }
+ spanReporters.foreach {
+ case (_, entry) =>
+ Future {
+ Try {
+ entry.reporter.reconfigure(config)
+ }.failed.foreach { error =>
+ logger.error(s"Span reporter [${entry.name}] failed to reconfigure.", error)
+ }
+ }(entry.executionContext)
+ }
+
registryConfiguration = newConfig
}
@@ -244,7 +275,13 @@ object ReporterRegistry {
private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
entry.isActive = false
- Future(entry.reporter.stop())(entry.executionContext).andThen {
+ Future {
+ Try {
+ entry.reporter.stop()
+ }.failed.foreach { error =>
+ logger.error(s"Metric reporter [${entry.name}] failed to stop.", error)
+ }
+ }(entry.executionContext).andThen {
case _ => entry.executionContext.shutdown()
}(ExecutionContext.fromExecutor(registryExecutionContext))
}
@@ -252,7 +289,13 @@ object ReporterRegistry {
private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
entry.isActive = false
- Future(entry.reporter.stop())(entry.executionContext).andThen {
+ Future {
+ Try {
+ entry.reporter.stop()
+ }.failed.foreach { error =>
+ logger.error(s"Span reporter [${entry.name}] failed to stop.", error)
+ }
+ }(entry.executionContext).andThen {
case _ => entry.executionContext.shutdown()
}(ExecutionContext.fromExecutor(registryExecutionContext))
}
@@ -316,7 +359,11 @@ object ReporterRegistry {
entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
Future {
- entry.reporter.reportSpans(spanBatch.asScala)
+ Try {
+ entry.reporter.reportSpans(spanBatch.asScala)
+ }.failed.foreach { error =>
+ logger.error(s"Reporter [${entry.name}] failed to report spans.", error)
+ }
}(entry.executionContext)
}
}
diff --git a/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala b/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala
new file mode 100644
index 00000000..f2c621b0
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala
@@ -0,0 +1,24 @@
+package kamon.util
+
+/**
+ * Keeps track of the values produced by the source and produce the difference between the last two observed values
+ * when calling get. This class assumes the source increases monotonically and any produced value that violates this
+ * assumption will be dropped.
+ *
+ */
+class DifferentialSource(source: () => Long) {
+ private var previousValue = source()
+
+ def get(): Long = synchronized {
+ val currentValue = source()
+ val diff = currentValue - previousValue
+ previousValue = currentValue
+
+ if(diff < 0) 0 else diff
+ }
+}
+
+object DifferentialSource {
+ def apply(source: () => Long): DifferentialSource =
+ new DifferentialSource(source)
+}
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala
index bb6d27f9..be340df9 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala
@@ -15,7 +15,7 @@
package kamon.testkit
-import kamon.metric._
+import kamon.metric.{BaseMetric, _}
import _root_.scala.collection.concurrent.TrieMap
@@ -30,6 +30,27 @@ trait MetricInspection {
val instrumentsWithTheTag = instruments.keys.filter(_.keys.exists(_ == tag))
instrumentsWithTheTag.map(t => t(tag)).toSeq
}
+
+ def partialRefine(tags: Map[String, String]): Seq[Map[String, String]] = {
+ val instrumentsField = classOf[BaseMetric[_, _]].getDeclaredField("instruments")
+ instrumentsField.setAccessible(true)
+
+ val instruments = instrumentsField.get(metric).asInstanceOf[TrieMap[Map[String, String], _]]
+
+ instruments.keys.filter { metricKey =>
+ tags.toSeq.forall { case (k, v) =>
+ metricKey.contains(k) && metricKey(k) == v
+ }
+ }.toSeq
+ }
+
+ def partialRefineKeys(tags: Set[String]): Seq[Map[String, String]] = {
+ val instrumentsField = classOf[BaseMetric[_, _]].getDeclaredField("instruments")
+ instrumentsField.setAccessible(true)
+
+ val instruments = instrumentsField.get(metric).asInstanceOf[TrieMap[Map[String, String], _]]
+ instruments.keys.filter(key => tags.subsetOf(key.keySet)).toSeq
+ }
}
implicit class HistogramMetricSyntax(histogram: Histogram) {