diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/ReporterRegistry.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/ReporterRegistry.scala | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 5f46edf6..f0d744e5 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -20,9 +20,11 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import java.util.concurrent._ import com.typesafe.config.Config +import kamon.ReporterRegistry.SpanSink import kamon.metric._ import kamon.trace.Span -import kamon.util.{DynamicAccess, Registration} +import kamon.trace.Span.FinishedSpan +import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration} import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -42,6 +44,12 @@ trait ReporterRegistry { def stopAllReporters(): Future[Unit] } +object ReporterRegistry { + private[kamon] trait SpanSink { + def reportSpan(finishedSpan: FinishedSpan): Unit + } +} + sealed trait Reporter { def start(): Unit def stop(): Unit @@ -53,10 +61,10 @@ trait MetricReporter extends Reporter { } trait SpanReporter extends Reporter { - def reportSpans(spans: Seq[Span.CompletedSpan]): Unit + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit } -class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry { +class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) private val reporterCounter = new AtomicLong(0L) @@ -212,7 +220,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con } } - private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { + def reportSpan(span: Span.FinishedSpan): Unit = { spanReporters.foreach { case (_, reporterEntry) => if(reporterEntry.isActive) reporterEntry.buffer.offer(span) @@ -251,7 +259,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con val bufferCapacity: Int, val executionContext: ExecutionContextExecutorService ) { - val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity) + val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) } private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { @@ -290,7 +298,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con spanReporters.foreach { case (_, entry) => - val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity) + val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) entry.buffer.drainTo(spanBatch, entry.bufferCapacity) Future { |