diff options
Diffstat (limited to 'kamon-core/src')
-rw-r--r-- | kamon-core/src/main/scala/kamon/ReporterRegistry.scala | 63 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/util/DifferentialSource.scala | 24 |
2 files changed, 79 insertions, 8 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index eedec830..7d05ab92 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -122,7 +122,13 @@ object ReporterRegistry { executionContext = ExecutionContext.fromExecutorService(executor) ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future { + Try { + reporterEntry.reporter.start() + }.failed.foreach { error => + logger.error(s"Metric reporter [$name] failed to start.", error) + } + }(reporterEntry.executionContext) if(metricReporters.isEmpty) reStartMetricTicker() @@ -142,7 +148,13 @@ object ReporterRegistry { executionContext = ExecutionContext.fromExecutorService(executor) ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future { + Try { + reporterEntry.reporter.start() + }.failed.foreach { error => + logger.error(s"Span reporter [$name] failed to start.", error) + } + }(reporterEntry.executionContext) if(spanReporters.isEmpty) reStartTraceTicker() @@ -174,7 +186,7 @@ object ReporterRegistry { } } - Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) + Future.sequence(reporterStopFutures.result()).map(_ => ()) } private[kamon] def reconfigure(config: Config): Unit = synchronized { @@ -187,8 +199,27 @@ object ReporterRegistry { reStartTraceTicker() // Reconfigure all registered reporters - metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + metricReporters.foreach { + case (_, entry) => + Future { + Try { + entry.reporter.reconfigure(config) + }.failed.foreach { error => + logger.error(s"Metric reporter [${entry.name}] failed to reconfigure.", error) + } + }(entry.executionContext) + } + spanReporters.foreach { + case (_, entry) => + Future { + Try { + entry.reporter.reconfigure(config) + }.failed.foreach { error => + logger.error(s"Span reporter [${entry.name}] failed to reconfigure.", error) + } + }(entry.executionContext) + } + registryConfiguration = newConfig } @@ -244,7 +275,13 @@ object ReporterRegistry { private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { + Future { + Try { + entry.reporter.stop() + }.failed.foreach { error => + logger.error(s"Metric reporter [${entry.name}] failed to stop.", error) + } + }(entry.executionContext).andThen { case _ => entry.executionContext.shutdown() }(ExecutionContext.fromExecutor(registryExecutionContext)) } @@ -252,7 +289,13 @@ object ReporterRegistry { private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { + Future { + Try { + entry.reporter.stop() + }.failed.foreach { error => + logger.error(s"Span reporter [${entry.name}] failed to stop.", error) + } + }(entry.executionContext).andThen { case _ => entry.executionContext.shutdown() }(ExecutionContext.fromExecutor(registryExecutionContext)) } @@ -316,7 +359,11 @@ object ReporterRegistry { entry.buffer.drainTo(spanBatch, entry.bufferCapacity) Future { - entry.reporter.reportSpans(spanBatch.asScala) + Try { + entry.reporter.reportSpans(spanBatch.asScala) + }.failed.foreach { error => + logger.error(s"Reporter [${entry.name}] failed to report spans.", error) + } }(entry.executionContext) } } diff --git a/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala b/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala new file mode 100644 index 00000000..f2c621b0 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/DifferentialSource.scala @@ -0,0 +1,24 @@ +package kamon.util + +/** + * Keeps track of the values produced by the source and produce the difference between the last two observed values + * when calling get. This class assumes the source increases monotonically and any produced value that violates this + * assumption will be dropped. + * + */ +class DifferentialSource(source: () => Long) { + private var previousValue = source() + + def get(): Long = synchronized { + val currentValue = source() + val diff = currentValue - previousValue + previousValue = currentValue + + if(diff < 0) 0 else diff + } +} + +object DifferentialSource { + def apply(source: () => Long): DifferentialSource = + new DifferentialSource(source) +} |