diff options
4 files changed, 141 insertions, 9 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/util/DifferentialSourceSpec.scala b/kamon-core-tests/src/test/scala/kamon/util/DifferentialSourceSpec.scala new file mode 100644 index 00000000..0a2f83e5 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/util/DifferentialSourceSpec.scala @@ -0,0 +1,40 @@ +package kamon.util + +import org.scalatest.{Matchers, WordSpec} + +class DifferentialSourceSpec extends WordSpec with Matchers { + + "a Differential Source" should { + "get the difference between the last two observations" in { + val source = sourceOf(0, 0, 1, 1, 2, 3, 4, 6, 8, 10, 12, 16, 18) + val expectedDiffs = Seq(0, 1, 0, 1, 1, 1, 2, 2, 2, 2, 4, 2) + + values(expectedDiffs.length, source) should contain theSameElementsInOrderAs(expectedDiffs) + } + + "ignore decrements in observations" in { + val source = sourceOf(10, 10, 5, 5, 10, 10) + val expectedDiffs = Seq(0, 0, 0, 5, 0) + + values(expectedDiffs.length, source) should contain theSameElementsInOrderAs(expectedDiffs) + } + } + + def sourceOf(numbers: Long*): DifferentialSource = DifferentialSource(new (() => Long) { + var remaining = numbers.toList + + override def apply(): Long = { + if(remaining.isEmpty) 0 else { + val head = remaining.head + remaining = remaining.tail + head + } + } + }) + + def values(count: Int, source: DifferentialSource): Seq[Long] = { + for(_ <- 1 to count) yield source.get() + } +} + + diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index eedec830..7d05ab92 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -122,7 +122,13 @@ object ReporterRegistry { executionContext = ExecutionContext.fromExecutorService(executor) ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future { + Try { + reporterEntry.reporter.start() + }.failed.foreach { error => + logger.error(s"Metric reporter [$name] failed to start.", error) + } + }(reporterEntry.executionContext) if(metricReporters.isEmpty) reStartMetricTicker() @@ -142,7 +148,13 @@ object ReporterRegistry { executionContext = ExecutionContext.fromExecutorService(executor) ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future { + Try { + reporterEntry.reporter.start() + }.failed.foreach { error => + logger.error(s"Span reporter [$name] failed to start.", error) + } + }(reporterEntry.executionContext) if(spanReporters.isEmpty) reStartTraceTicker() @@ -174,7 +186,7 @@ object ReporterRegistry { } } - Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) + Future.sequence(reporterStopFutures.result()).map(_ => ()) } private[kamon] def reconfigure(config: Config): Unit = synchronized { @@ -187,8 +199,27 @@ object ReporterRegistry { reStartTraceTicker() // Reconfigure all registered reporters - metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + metricReporters.foreach { + case (_, entry) => + Future { + Try { + entry.reporter.reconfigure(config) + }.failed.foreach { error => + logger.error(s"Metric reporter [${entry.name}] failed to reconfigure.", error) + } + }(entry.executionContext) + } + spanReporters.foreach { + case (_, entry) => + Future { + Try { + entry.reporter.reconfigure(config) + }.failed.foreach { error => + logger.error(s"Span reporter [${entry.name}] failed to reconfigure.", error) + } + }(entry.executionContext) + } + registryConfiguration = newConfig } @@ -244,7 +275,13 @@ object ReporterRegistry { private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { + Future { + Try { + entry.reporter.stop() + }.failed.foreach { error => + logger.error(s"Metric reporter [${entry.name}] failed to stop.", error) + } + }(entry.executionContext).andThen { case _ => entry.executionContext.shutdown() }(ExecutionContext.fromExecutor(registryExecutionContext)) } @@ -252,7 +289,13 @@ object ReporterRegistry { private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { + Future { + Try { + entry.reporter.stop() + }.failed.foreach { error => + logger.error(s"Span reporter [${entry.name}] failed to stop.", error) + } + }(entry.executionContext).andThen { case _ => entry.executionContext.shutdown() }(ExecutionContext.fromExecutor(registryExecutionContext)) } @@ -316,7 +359,11 @@ object ReporterRegistry { entry.buffer.drainTo(spanBatch, entry.bufferCapacity) Future { - entry.reporter.reportSpans(spanBatch.asScala) + Try { + entry.reporter.reportSpans(spanBatch.asScala) + }.failed.foreach { error => + logger.error(s"Reporter [${entry.name}] failed to report spans.", error) + } }(entry.executionContext) } } diff --git a/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala b/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala new file mode 100644 index 00000000..f2c621b0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala @@ -0,0 +1,24 @@ +package kamon.util + +/** + * Keeps track of the values produced by the source and produce the difference between the last two observed values + * when calling get. This class assumes the source increases monotonically and any produced value that violates this + * assumption will be dropped. + * + */ +class DifferentialSource(source: () => Long) { + private var previousValue = source() + + def get(): Long = synchronized { + val currentValue = source() + val diff = currentValue - previousValue + previousValue = currentValue + + if(diff < 0) 0 else diff + } +} + +object DifferentialSource { + def apply(source: () => Long): DifferentialSource = + new DifferentialSource(source) +} diff --git a/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala index bb6d27f9..be340df9 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/MetricInspection.scala @@ -15,7 +15,7 @@ package kamon.testkit -import kamon.metric._ +import kamon.metric.{BaseMetric, _} import _root_.scala.collection.concurrent.TrieMap @@ -30,6 +30,27 @@ trait MetricInspection { val instrumentsWithTheTag = instruments.keys.filter(_.keys.exists(_ == tag)) instrumentsWithTheTag.map(t => t(tag)).toSeq } + + def partialRefine(tags: Map[String, String]): Seq[Map[String, String]] = { + val instrumentsField = classOf[BaseMetric[_, _]].getDeclaredField("instruments") + instrumentsField.setAccessible(true) + + val instruments = instrumentsField.get(metric).asInstanceOf[TrieMap[Map[String, String], _]] + + instruments.keys.filter { metricKey => + tags.toSeq.forall { case (k, v) => + metricKey.contains(k) && metricKey(k) == v + } + }.toSeq + } + + def partialRefineKeys(tags: Set[String]): Seq[Map[String, String]] = { + val instrumentsField = classOf[BaseMetric[_, _]].getDeclaredField("instruments") + instrumentsField.setAccessible(true) + + val instruments = instrumentsField.get(metric).asInstanceOf[TrieMap[Map[String, String], _]] + instruments.keys.filter(key => tags.subsetOf(key.keySet)).toSeq + } } implicit class HistogramMetricSyntax(histogram: Histogram) { |