diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 9 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala | 6 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 64 |
3 files changed, 55 insertions, 24 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 6015e350..ee301e8a 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -18,7 +18,6 @@ package trace import java.time.Instant -import kamon.ReporterRegistry.SpanSink import kamon.context.Context import kamon.metric.MeasurementUnit import kamon.trace.SpanContext.SamplingDecision @@ -90,7 +89,7 @@ object Span { final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span { + initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span { private var collectMetrics: Boolean = trackMetrics private var open: Boolean = true @@ -203,7 +202,7 @@ object Span { recordSpanMetrics(to) if(sampled) - spanSink.reportSpan(toFinishedSpan(to)) + spanBuffer.append(toFinishedSpan(to)) } } @@ -229,9 +228,9 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, + initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanSink, trackMetrics, scopeSpanMetrics, clock) + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanBuffer, trackMetrics, scopeSpanMetrics, clock) } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index 4cad1eb7..542638cf 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -38,7 +38,7 @@ object SpanPropagation { import B3.Headers override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { - val identityProvider = Kamon.tracer.identityProvider + val identityProvider = Kamon.identityProvider val traceID = reader.read(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) @@ -122,7 +122,7 @@ object SpanPropagation { override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { reader.read(Header.B3).map { header => - val identityProvider = Kamon.tracer.identityProvider + val identityProvider = Kamon.identityProvider val (traceID, spanID, samplingDecision, parentSpanID) = header.splitToTuple("-") @@ -220,7 +220,7 @@ object SpanPropagation { if(medium.available() == 0) context else { - val identityProvider = Kamon.tracer.identityProvider + val identityProvider = Kamon.identityProvider val colferSpan = new ColferSpan() colferSpan.unmarshal(medium.readAll(), 0) diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index ad7ffbed..7a314205 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -18,13 +18,13 @@ package kamon.trace import java.time.Instant import com.typesafe.config.Config -import kamon.ReporterRegistry.SpanSink import kamon.Kamon import kamon.metric.MetricLookup -import kamon.trace.Span.TagValue +import kamon.trace.Span.{FinishedSpan, TagValue} import kamon.trace.SpanContext.SamplingDecision import kamon.trace.Tracer.SpanBuilder import kamon.util.{Clock, DynamicAccess} +import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue} import org.slf4j.LoggerFactory import scala.collection.immutable @@ -37,19 +37,26 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer { - private val logger = LoggerFactory.getLogger(classOf[Tracer]) + private[kamon] trait SpanBuffer { + def append(span: FinishedSpan): Unit + def flush(): Seq[FinishedSpan] + } + + final class Default(metrics: MetricLookup, initialConfig: Config, clock: Clock) extends Tracer with SpanBuffer { + private val _logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) - @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true - @volatile private[Tracer] var scopeSpanMetrics: Boolean = true + @volatile private[Tracer] var _traceReporterQueueSize = 1024 + @volatile private[Tracer] var _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](_traceReporterQueueSize) + @volatile private[Tracer] var _joinRemoteParentsWithSameSpanID: Boolean = true + @volatile private[Tracer] var _scopeSpanMetrics: Boolean = true @volatile private[Tracer] var _sampler: Sampler = Sampler.Never @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default() reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, spanSink, clock) + new SpanBuilder(operationName, this, this, clock) override def identityProvider: IdentityProvider = this._identityProvider @@ -69,29 +76,54 @@ object Tracer { case other => sys.error(s"Unexpected sampler name $other.") } + val newTraceReporterQueueSize = traceConfig.getInt("reporter-queue-size") val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id") val newScopeSpanMetrics = traceConfig.getBoolean("span-metrics.scope-spans-to-parent") val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider]( traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)] ).get + if(_traceReporterQueueSize != newTraceReporterQueueSize) { + // By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters. + // Since reconfigures are very unlikely to happen beyond application startup this might be a problem at all. + // If we eventually decide to keep those possible Spans around then we will need to change the queue type to + // multiple consumer as the reconfiguring thread will need to drain the contents before replacing. + _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](newTraceReporterQueueSize) + } + _sampler = newSampler - joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID - scopeSpanMetrics = newScopeSpanMetrics + _joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID + _scopeSpanMetrics = newScopeSpanMetrics _identityProvider = newIdentityProvider + _traceReporterQueueSize = newTraceReporterQueueSize }.failed.foreach { - ex => logger.error("Unable to reconfigure Kamon Tracer", ex) + ex => _logger.error("Unable to reconfigure Kamon Tracer", ex) } } + + + override def append(span: FinishedSpan): Unit = + _spanBuffer.offer(span) + + override def flush(): Seq[FinishedSpan] = { + var spans = Seq.empty[FinishedSpan] + _spanBuffer.drain(new MessagePassingQueue.Consumer[Span.FinishedSpan] { + override def accept(span: FinishedSpan): Unit = + spans = span +: spans + }) + + spans + } + } object Default { - def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default = - new Default(metrics, spanSink, initialConfig, clock) + def apply(metrics: MetricLookup, initialConfig: Config, clock: Clock): Default = + new Default(metrics, initialConfig, clock) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanBuffer: Tracer.SpanBuffer, clock: Clock) { private var parentSpan: Span = _ private var initialOperationName: String = operationName private var from: Instant = Instant.EPOCH @@ -192,15 +224,15 @@ object Tracer { initialSpanTags, initialMetricTags, spanFrom, - spanSink, + spanBuffer, trackMetrics, - tracer.scopeSpanMetrics, + tracer._scopeSpanMetrics, clock ) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = - if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID) + if(parent.isRemote() && tracer._joinRemoteParentsWithSameSpanID) parent.context().copy(samplingDecision = samplingDecision) else parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) |