diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/ReporterRegistry.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/ReporterRegistry.scala | 105 |
1 files changed, 40 insertions, 65 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 09c980f6..b42c5abe 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -6,6 +6,7 @@ import java.util.concurrent._ import com.typesafe.config.Config import kamon.metric._ +import kamon.trace.Span import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -14,15 +15,40 @@ import scala.util.control.NonFatal trait ReporterRegistry { def loadFromConfig(): Unit + def add(reporter: MetricsReporter): Registration def add(reporter: MetricsReporter, name: String): Registration + def add(reporter: SpansReporter): Registration + def stopAll(): Future[Unit] } + +trait Registration { + def cancel(): Boolean +} + +trait MetricsReporter { + def start(config: Config): Unit + def reconfigure(config: Config): Unit + def stop(): Unit + + def reportTickSnapshot(snapshot: TickSnapshot) +} + +trait SpansReporter { + def start(config: Config): Unit + def reconfigure(config: Config): Unit + def stop(): Unit + + def reportSpan(span: Span.CompletedSpan): Unit +} + class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) extends ReporterRegistry { private val registryExecutionContext = Executors.newSingleThreadScheduledExecutor(threadFactory("kamon-reporter-registry")) private val metricsTickerSchedule = new AtomicReference[ScheduledFuture[_]]() private val metricReporters = new ConcurrentLinkedQueue[ReporterEntry]() + private val spanReporters = new ConcurrentLinkedQueue[SpansReporter]() private val reporterCounter = new AtomicLong(0L) reconfigure(initialConfig) @@ -55,6 +81,14 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) } } + override def add(reporter: SpansReporter): Registration = { + spanReporters.add(reporter) + + new Registration { + override def cancel(): Boolean = true + } + } + override def stopAll(): Future[Unit] = { implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) val reporterStopFutures = Vector.newBuilder[Future[Unit]] @@ -87,6 +121,11 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) } } + + private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { + spanReporters.forEach(_.reportSpan(span)) + } + private def stopReporter(entry: ReporterEntry): Future[Unit] = { entry.isActive = false @@ -131,7 +170,7 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) reporterEntries.forEach { entry => Future { if(entry.isActive) - entry.reporter.processTick(tickSnapshot) + entry.reporter.reportTickSnapshot(tickSnapshot) }(executor = entry.executionContext) } @@ -142,68 +181,4 @@ class ReporterRegistryImpl(metrics: RecorderRegistryImpl, initialConfig: Config) case NonFatal(t) => logger.error("Error while running a tick", t) } } -} - - - -trait Registration { - def cancel(): Boolean -} - -trait MetricsReporter { - def reconfigure(config: Config): Unit - - def start(config: Config): Unit - def stop(): Unit - - def processTick(snapshot: TickSnapshot) -} - - - -object TestingAllExample extends App { - val recorder = Kamon.metrics.getRecorder(Entity("topo", "human-being", Map.empty)) - - val registration = Kamon.reporters.add(new DummyReporter("test")) - - var x = 0 - while(true) { - recorder.counter("test-other").increment() - Thread.sleep(100) - x += 1 - - if(x == 50) { - registration.cancel() - } - - if(x == 100) { - println("Stopping all reporters") - Kamon.reporters.stopAll() - } - } - -} - - -class DummyReporter(name: String) extends MetricsReporter { - override def reconfigure(config: Config): Unit = { - println("NAME: " + name + "===> Reconfiguring Dummy") - } - - override def start(config: Config): Unit = { - - println("NAME: " + name + "===> Starting DUMMY") - } - - override def stop(): Unit = { - println("NAME: " + name + "===> Stopping Dummy") - } - - override def processTick(snapshot: TickSnapshot): Unit = { - println("NAME: " + name + s"===> [${Thread.currentThread().getName()}] Processing a tick in dummy." + snapshot) - println(s"From: ${snapshot.interval.from}, to: ${snapshot.interval.to}") - snapshot.entities.foreach { e => - println(e.counters.map(c => s"Counter [${c.name}] => " + c.value).mkString(", ")) - } - } }
\ No newline at end of file |