diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 4 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala | 4 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala (renamed from kamon-core/src/main/scala/kamon/trace/SpanCodec.scala) | 128 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 18 |
4 files changed, 91 insertions, 63 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 43391af7..6015e350 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -19,7 +19,7 @@ package trace import java.time.Instant import kamon.ReporterRegistry.SpanSink -import kamon.context.Key +import kamon.context.Context import kamon.metric.MeasurementUnit import kamon.trace.SpanContext.SamplingDecision import kamon.util.Clock @@ -66,7 +66,7 @@ sealed abstract class Span { object Span { - val ContextKey = Key.broadcast[Span]("span", Span.Empty) + val ContextKey = Context.key[Span]("span", Span.Empty) object Empty extends Span { override val context: SpanContext = SpanContext.EmptySpanContext diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala index 2a8e2271..ed329ef7 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala @@ -15,7 +15,7 @@ package kamon.trace -import kamon.context.Key +import kamon.context.{Context} import kamon.trace.Tracer.SpanBuilder /** @@ -39,7 +39,7 @@ object SpanCustomizer { override def customize(spanBuilder: SpanBuilder): SpanBuilder = spanBuilder } - val ContextKey = Key.local[SpanCustomizer]("span-customizer", Noop) + val ContextKey = Context.key[SpanCustomizer]("span-customizer", Noop) def forOperationName(operationName: String): SpanCustomizer = new SpanCustomizer { override def customize(spanBuilder: SpanBuilder): SpanBuilder = diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index 14b28d54..dc168347 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -19,54 +19,43 @@ import java.net.{URLDecoder, URLEncoder} import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codecs, Context, TextMap} +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context._ import kamon.context.generated.binary.span.{Span => ColferSpan} +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} import kamon.trace.SpanContext.SamplingDecision -object SpanCodec { +/** + * Propagation mechanisms for Kamon's Span data to and from HTTP and Binary mediums. + */ +object SpanPropagation { - class B3 extends Codecs.ForEntry[TextMap] { + /** + * Reads and Writes a Span instance using the B3 propagation format. The specification and semantics of the B3 + * Propagation protocol can be found here: https://github.com/openzipkin/b3-propagation + */ + class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { import B3.Headers - override def encode(context: Context): TextMap = { - val span = context.get(Span.ContextKey) - val carrier = TextMap.Default() - - if(span.nonEmpty()) { - val spanContext = span.context() - carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) - - if(spanContext.parentID != IdentityProvider.NoIdentifier) - carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) - - encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - carrier.put(Headers.Sampled, samplingDecision) - } - } - - carrier - } - - override def decode(carrier: TextMap, context: Context): Context = { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = carrier.get(Headers.TraceIdentifier) + val traceID = reader.read(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = carrier.get(Headers.SpanIdentifier) + val spanID = reader.read(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = carrier.get(Headers.ParentSpanIdentifier) + val parentID = reader.read(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = carrier.get(Headers.Flags) + val flags = reader.read(Headers.Flags) - val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -77,6 +66,24 @@ object SpanCodec { } else context } + + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val spanContext = span.context() + writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + + if(spanContext.parentID != IdentityProvider.NoIdentifier) + writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + + encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => + writer.write(Headers.Sampled, samplingDecision) + } + } + } + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { case SamplingDecision.Sample => Some("1") case SamplingDecision.DoNotSample => Some("0") @@ -102,38 +109,27 @@ object SpanCodec { } - class Colfer extends Codecs.ForEntry[ByteBuffer] { - val emptyBuffer = ByteBuffer.allocate(0) - - override def encode(context: Context): ByteBuffer = { - val span = context.get(Span.ContextKey) - if(span.nonEmpty()) { - val marshalBuffer = Colfer.codecBuffer.get() - val colferSpan = new ColferSpan() - val spanContext = span.context() - - colferSpan.setTraceID(spanContext.traceID.bytes) - colferSpan.setSpanID(spanContext.spanID.bytes) - colferSpan.setParentID(spanContext.parentID.bytes) - colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) - - val marshalledSize = colferSpan.marshal(marshalBuffer, 0) - val buffer = ByteBuffer.allocate(marshalledSize) - buffer.put(marshalBuffer, 0, marshalledSize) - buffer - - } else emptyBuffer - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - carrier.clear() - - if(carrier.capacity() == 0) + /** + * Defines a bare bones binary context propagation that uses Colfer [1] as the serialization library. The Schema + * for the Span data is simply defined as: + * + * type Span struct { + * traceID binary + * spanID binary + * parentID binary + * samplingDecision uint8 + * } + * + */ + class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { + + override def read(medium: ByteStreamReader, context: Context): Context = { + if(medium.available() == 0) context else { val identityProvider = Kamon.tracer.identityProvider val colferSpan = new ColferSpan() - colferSpan.unmarshal(carrier.array(), 0) + colferSpan.unmarshal(medium.readAll(), 0) val spanContext = SpanContext( traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), @@ -146,6 +142,24 @@ object SpanCodec { } } + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + medium.write(marshalBuffer, 0, marshalledSize) + + } + } private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { case SamplingDecision.Sample => 1 diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 3e857f00..ad7ffbed 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -99,6 +99,7 @@ object Tracer { private var initialMetricTags = Map.empty[String, String] private var useParentFromContext = true private var trackMetrics = true + private var providedTraceID = IdentityProvider.NoIdentifier def asChildOf(parent: Span): SpanBuilder = { if(parent != Span.Empty) this.parentSpan = parent @@ -158,6 +159,11 @@ object Tracer { this } + def withTraceID(identifier: IdentityProvider.Identifier): SpanBuilder = { + this.providedTraceID = identifier + this + } + def start(): Span = { val spanFrom = if(from == Instant.EPOCH) clock.instant() else from @@ -199,13 +205,21 @@ object Tracer { else parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) - private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = + private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = { + val traceID = + if(providedTraceID != IdentityProvider.NoIdentifier) + providedTraceID + else + tracer._identityProvider.traceIdGenerator().generate() + + SpanContext( - traceID = tracer._identityProvider.traceIdGenerator().generate(), + traceID, spanID = tracer._identityProvider.spanIdGenerator().generate(), parentID = IdentityProvider.NoIdentifier, samplingDecision = samplingDecision ) + } } private final class TracerMetrics(metricLookup: MetricLookup) { |