diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Tracer.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 64 |
1 files changed, 48 insertions, 16 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index ad7ffbed..7a314205 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -18,13 +18,13 @@ package kamon.trace import java.time.Instant import com.typesafe.config.Config -import kamon.ReporterRegistry.SpanSink import kamon.Kamon import kamon.metric.MetricLookup -import kamon.trace.Span.TagValue +import kamon.trace.Span.{FinishedSpan, TagValue} import kamon.trace.SpanContext.SamplingDecision import kamon.trace.Tracer.SpanBuilder import kamon.util.{Clock, DynamicAccess} +import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue} import org.slf4j.LoggerFactory import scala.collection.immutable @@ -37,19 +37,26 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer { - private val logger = LoggerFactory.getLogger(classOf[Tracer]) + private[kamon] trait SpanBuffer { + def append(span: FinishedSpan): Unit + def flush(): Seq[FinishedSpan] + } + + final class Default(metrics: MetricLookup, initialConfig: Config, clock: Clock) extends Tracer with SpanBuffer { + private val _logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) - @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true - @volatile private[Tracer] var scopeSpanMetrics: Boolean = true + @volatile private[Tracer] var _traceReporterQueueSize = 1024 + @volatile private[Tracer] var _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](_traceReporterQueueSize) + @volatile private[Tracer] var _joinRemoteParentsWithSameSpanID: Boolean = true + @volatile private[Tracer] var _scopeSpanMetrics: Boolean = true @volatile private[Tracer] var _sampler: Sampler = Sampler.Never @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default() reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, spanSink, clock) + new SpanBuilder(operationName, this, this, clock) override def identityProvider: IdentityProvider = this._identityProvider @@ -69,29 +76,54 @@ object Tracer { case other => sys.error(s"Unexpected sampler name $other.") } + val newTraceReporterQueueSize = traceConfig.getInt("reporter-queue-size") val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id") val newScopeSpanMetrics = traceConfig.getBoolean("span-metrics.scope-spans-to-parent") val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider]( traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)] ).get + if(_traceReporterQueueSize != newTraceReporterQueueSize) { + // By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters. + // Since reconfigures are very unlikely to happen beyond application startup this might be a problem at all. + // If we eventually decide to keep those possible Spans around then we will need to change the queue type to + // multiple consumer as the reconfiguring thread will need to drain the contents before replacing. + _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](newTraceReporterQueueSize) + } + _sampler = newSampler - joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID - scopeSpanMetrics = newScopeSpanMetrics + _joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID + _scopeSpanMetrics = newScopeSpanMetrics _identityProvider = newIdentityProvider + _traceReporterQueueSize = newTraceReporterQueueSize }.failed.foreach { - ex => logger.error("Unable to reconfigure Kamon Tracer", ex) + ex => _logger.error("Unable to reconfigure Kamon Tracer", ex) } } + + + override def append(span: FinishedSpan): Unit = + _spanBuffer.offer(span) + + override def flush(): Seq[FinishedSpan] = { + var spans = Seq.empty[FinishedSpan] + _spanBuffer.drain(new MessagePassingQueue.Consumer[Span.FinishedSpan] { + override def accept(span: FinishedSpan): Unit = + spans = span +: spans + }) + + spans + } + } object Default { - def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default = - new Default(metrics, spanSink, initialConfig, clock) + def apply(metrics: MetricLookup, initialConfig: Config, clock: Clock): Default = + new Default(metrics, initialConfig, clock) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanBuffer: Tracer.SpanBuffer, clock: Clock) { private var parentSpan: Span = _ private var initialOperationName: String = operationName private var from: Instant = Instant.EPOCH @@ -192,15 +224,15 @@ object Tracer { initialSpanTags, initialMetricTags, spanFrom, - spanSink, + spanBuffer, trackMetrics, - tracer.scopeSpanMetrics, + tracer._scopeSpanMetrics, clock ) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = - if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID) + if(parent.isRemote() && tracer._joinRemoteParentsWithSameSpanID) parent.context().copy(samplingDecision = samplingDecision) else parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) |