aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala9
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala6
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala64
3 files changed, 55 insertions, 24 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 6015e350..ee301e8a 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -18,7 +18,6 @@ package trace
import java.time.Instant
-import kamon.ReporterRegistry.SpanSink
import kamon.context.Context
import kamon.metric.MeasurementUnit
import kamon.trace.SpanContext.SamplingDecision
@@ -90,7 +89,7 @@ object Span {
final class Local(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span {
+ initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer, trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock) extends Span {
private var collectMetrics: Boolean = trackMetrics
private var open: Boolean = true
@@ -203,7 +202,7 @@ object Span {
recordSpanMetrics(to)
if(sampled)
- spanSink.reportSpan(toFinishedSpan(to))
+ spanBuffer.append(toFinishedSpan(to))
}
}
@@ -229,9 +228,9 @@ object Span {
object Local {
def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], from: Instant, spanSink: SpanSink,
+ initialMetricTags: Map[String, String], from: Instant, spanBuffer: Tracer.SpanBuffer,
trackMetrics: Boolean, scopeSpanMetrics: Boolean, clock: Clock): Local =
- new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanSink, trackMetrics, scopeSpanMetrics, clock)
+ new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, from, spanBuffer, trackMetrics, scopeSpanMetrics, clock)
}
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
index 4cad1eb7..542638cf 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
@@ -38,7 +38,7 @@ object SpanPropagation {
import B3.Headers
override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val traceID = reader.read(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
@@ -122,7 +122,7 @@ object SpanPropagation {
override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
reader.read(Header.B3).map { header =>
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val (traceID, spanID, samplingDecision, parentSpanID) = header.splitToTuple("-")
@@ -220,7 +220,7 @@ object SpanPropagation {
if(medium.available() == 0)
context
else {
- val identityProvider = Kamon.tracer.identityProvider
+ val identityProvider = Kamon.identityProvider
val colferSpan = new ColferSpan()
colferSpan.unmarshal(medium.readAll(), 0)
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index ad7ffbed..7a314205 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -18,13 +18,13 @@ package kamon.trace
import java.time.Instant
import com.typesafe.config.Config
-import kamon.ReporterRegistry.SpanSink
import kamon.Kamon
import kamon.metric.MetricLookup
-import kamon.trace.Span.TagValue
+import kamon.trace.Span.{FinishedSpan, TagValue}
import kamon.trace.SpanContext.SamplingDecision
import kamon.trace.Tracer.SpanBuilder
import kamon.util.{Clock, DynamicAccess}
+import org.jctools.queues.{MessagePassingQueue, MpscArrayQueue}
import org.slf4j.LoggerFactory
import scala.collection.immutable
@@ -37,19 +37,26 @@ trait Tracer {
object Tracer {
- final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock) extends Tracer {
- private val logger = LoggerFactory.getLogger(classOf[Tracer])
+ private[kamon] trait SpanBuffer {
+ def append(span: FinishedSpan): Unit
+ def flush(): Seq[FinishedSpan]
+ }
+
+ final class Default(metrics: MetricLookup, initialConfig: Config, clock: Clock) extends Tracer with SpanBuffer {
+ private val _logger = LoggerFactory.getLogger(classOf[Tracer])
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
- @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true
- @volatile private[Tracer] var scopeSpanMetrics: Boolean = true
+ @volatile private[Tracer] var _traceReporterQueueSize = 1024
+ @volatile private[Tracer] var _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](_traceReporterQueueSize)
+ @volatile private[Tracer] var _joinRemoteParentsWithSameSpanID: Boolean = true
+ @volatile private[Tracer] var _scopeSpanMetrics: Boolean = true
@volatile private[Tracer] var _sampler: Sampler = Sampler.Never
@volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default()
reconfigure(initialConfig)
override def buildSpan(operationName: String): SpanBuilder =
- new SpanBuilder(operationName, this, spanSink, clock)
+ new SpanBuilder(operationName, this, this, clock)
override def identityProvider: IdentityProvider =
this._identityProvider
@@ -69,29 +76,54 @@ object Tracer {
case other => sys.error(s"Unexpected sampler name $other.")
}
+ val newTraceReporterQueueSize = traceConfig.getInt("reporter-queue-size")
val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id")
val newScopeSpanMetrics = traceConfig.getBoolean("span-metrics.scope-spans-to-parent")
val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider](
traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)]
).get
+ if(_traceReporterQueueSize != newTraceReporterQueueSize) {
+ // By simply changing the buffer we might be dropping Spans that have not been collected yet by the reporters.
+ // Since reconfigures are very unlikely to happen beyond application startup this might be a problem at all.
+ // If we eventually decide to keep those possible Spans around then we will need to change the queue type to
+ // multiple consumer as the reconfiguring thread will need to drain the contents before replacing.
+ _spanBuffer = new MpscArrayQueue[Span.FinishedSpan](newTraceReporterQueueSize)
+ }
+
_sampler = newSampler
- joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
- scopeSpanMetrics = newScopeSpanMetrics
+ _joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
+ _scopeSpanMetrics = newScopeSpanMetrics
_identityProvider = newIdentityProvider
+ _traceReporterQueueSize = newTraceReporterQueueSize
}.failed.foreach {
- ex => logger.error("Unable to reconfigure Kamon Tracer", ex)
+ ex => _logger.error("Unable to reconfigure Kamon Tracer", ex)
}
}
+
+
+ override def append(span: FinishedSpan): Unit =
+ _spanBuffer.offer(span)
+
+ override def flush(): Seq[FinishedSpan] = {
+ var spans = Seq.empty[FinishedSpan]
+ _spanBuffer.drain(new MessagePassingQueue.Consumer[Span.FinishedSpan] {
+ override def accept(span: FinishedSpan): Unit =
+ spans = span +: spans
+ })
+
+ spans
+ }
+
}
object Default {
- def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config, clock: Clock): Default =
- new Default(metrics, spanSink, initialConfig, clock)
+ def apply(metrics: MetricLookup, initialConfig: Config, clock: Clock): Default =
+ new Default(metrics, initialConfig, clock)
}
- final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink, clock: Clock) {
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanBuffer: Tracer.SpanBuffer, clock: Clock) {
private var parentSpan: Span = _
private var initialOperationName: String = operationName
private var from: Instant = Instant.EPOCH
@@ -192,15 +224,15 @@ object Tracer {
initialSpanTags,
initialMetricTags,
spanFrom,
- spanSink,
+ spanBuffer,
trackMetrics,
- tracer.scopeSpanMetrics,
+ tracer._scopeSpanMetrics,
clock
)
}
private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
- if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID)
+ if(parent.isRemote() && tracer._joinRemoteParentsWithSameSpanID)
parent.context().copy(samplingDecision = samplingDecision)
else
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)