aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace/Tracer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace/Tracer.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala64
1 files changed, 48 insertions, 16 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index ad7ffbed..7a314205 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -18,13 +18,13 @@ package kamon.trace
import java.time.Instant
import com.typesafe.config.Config
-import kamon.ReporterRegistry.SpanSink
import kamon.Kamon
import kamon.metric.MetricLookup
-import kamon.trace.Span.TagValue
+import kamon.trace.Span.{FinishedSpan, TagValue}
import kamon.trace.SpanContext.SamplingDecision
import kamon.trace.Tracer.SpanBuilder
import kamon.util.{Clock, DynamicAccess}
+import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue}
import org.slf4j.LoggerFactory
import scala.collection.immutable
@@ -37,19 +37,26 @@ trait Tracer {
object Tracer {
- final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer {
- private val logger = LoggerFactory.getLogger(classOf[Tracer])
+ private[kamon] trait SpanBuffer {
+ def append(span: FinishedSpan): Unit
+ def flush(): Seq[FinishedSpan]
+ }
+
+ final class Default(metrics: MetricLookup, initialConfig: Config, clock: Clock) extends Tracer with SpanBuffer {
+ private val _logger = LoggerFactory.getLogger(classOf[Tracer])
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
- @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true
- @volatile private[Tracer] var scopeSpanMetrics: Boolean = true
+ @volatile private[Tracer] var _traceReporterQueueSize = 1024
+ @volatile private[Tracer] var _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](_traceReporterQueueSize)
+ @volatile private[Tracer] var _joinRemoteParentsWithSameSpanID: Boolean = true
+ @volatile private[Tracer] var _scopeSpanMetrics: Boolean = true
@volatile private[Tracer] var _sampler: Sampler = Sampler.Never
@volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default()
reconfigure(initialConfig)
override def buildSpan(operationName: String): SpanBuilder =
- new SpanBuilder(operationName, this, spanSink, clock)
+ new SpanBuilder(operationName, this, this, clock)
override def identityProvider: IdentityProvider =
this._identityProvider
@@ -69,29 +76,54 @@ object Tracer {
case other => sys.error(s"Unexpected sampler name $other.")
}
+ val newTraceReporterQueueSize = traceConfig.getInt("reporter-queue-size")
val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id")
val newScopeSpanMetrics = traceConfig.getBoolean("span-metrics.scope-spans-to-parent")
val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider](
traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)]
).get
+ if(_traceReporterQueueSize != newTraceReporterQueueSize) {
+ // By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters.
+ // Since reconfigures are very unlikely to happen beyond application startup this might be a problem at all.
+ // If we eventually decide to keep those possible Spans around then we will need to change the queue type to
+ // multiple consumer as the reconfiguring thread will need to drain the contents before replacing.
+ _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](newTraceReporterQueueSize)
+ }
+
_sampler = newSampler
- joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
- scopeSpanMetrics = newScopeSpanMetrics
+ _joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
+ _scopeSpanMetrics = newScopeSpanMetrics
_identityProvider = newIdentityProvider
+ _traceReporterQueueSize = newTraceReporterQueueSize
}.failed.foreach {
- ex => logger.error("Unable to reconfigure Kamon Tracer", ex)
+ ex => _logger.error("Unable to reconfigure Kamon Tracer", ex)
}
}
+
+
+ override def append(span: FinishedSpan): Unit =
+ _spanBuffer.offer(span)
+
+ override def flush(): Seq[FinishedSpan] = {
+ var spans = Seq.empty[FinishedSpan]
+ _spanBuffer.drain(new MessagePassingQueue.Consumer[Span.FinishedSpan] {
+ override def accept(span: FinishedSpan): Unit =
+ spans = span +: spans
+ })
+
+ spans
+ }
+
}
object Default {
- def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default =
- new Default(metrics, spanSink, initialConfig, clock)
+ def apply(metrics: MetricLookup, initialConfig: Config, clock: Clock): Default =
+ new Default(metrics, initialConfig, clock)
}
- final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) {
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanBuffer: Tracer.SpanBuffer, clock: Clock) {
private var parentSpan: Span = _
private var initialOperationName: String = operationName
private var from: Instant = Instant.EPOCH
@@ -192,15 +224,15 @@ object Tracer {
initialSpanTags,
initialMetricTags,
spanFrom,
- spanSink,
+ spanBuffer,
trackMetrics,
- tracer.scopeSpanMetrics,
+ tracer._scopeSpanMetrics,
clock
)
}
private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
- if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID)
+ if(parent.isRemote() && tracer._joinRemoteParentsWithSameSpanID)
parent.context().copy(samplingDecision = samplingDecision)
else
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)