From b61c92ea3589450fd097ab79420230b61b458ae4 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 28 Sep 2018 12:08:24 +0200 Subject: cleanup HTTP propagation, introduce a new Binary propagation --- .../src/main/scala/kamon/trace/SpanCodec.scala | 77 +++++++++++----------- 1 file changed, 37 insertions(+), 40 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/trace/SpanCodec.scala') diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 7439520c..63f8e1b0 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -19,35 +19,36 @@ import java.net.{URLDecoder, URLEncoder} import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codecs, Context, HttpPropagation, TextMap} +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context._ import kamon.context.generated.binary.span.{Span => ColferSpan} -import kamon.context.HttpPropagation.Direction +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} import kamon.trace.SpanContext.SamplingDecision object SpanCodec { - class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { + class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { import B3.Headers - override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = reader.readHeader(Headers.TraceIdentifier) + val traceID = reader.read(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = reader.readHeader(Headers.SpanIdentifier) + val spanID = reader.read(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = reader.readHeader(Headers.ParentSpanIdentifier) + val parentID = reader.read(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = reader.readHeader(Headers.Flags) + val flags = reader.read(Headers.Flags) - val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -59,19 +60,19 @@ object SpanCodec { } - override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { val span = context.get(Span.ContextKey) if(span.nonEmpty()) { val spanContext = span.context() - writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) if(spanContext.parentID != IdentityProvider.NoIdentifier) - writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - writer.writeHeader(Headers.Sampled, samplingDecision) + writer.write(Headers.Sampled, samplingDecision) } } } @@ -101,38 +102,16 @@ object SpanCodec { } - class Colfer extends Codecs.ForEntry[ByteBuffer] { + class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { val emptyBuffer = ByteBuffer.allocate(0) - override def encode(context: Context): ByteBuffer = { - val span = context.get(Span.ContextKey) - if(span.nonEmpty()) { - val marshalBuffer = Colfer.codecBuffer.get() - val colferSpan = new ColferSpan() - val spanContext = span.context() - - colferSpan.setTraceID(spanContext.traceID.bytes) - colferSpan.setSpanID(spanContext.spanID.bytes) - colferSpan.setParentID(spanContext.parentID.bytes) - colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) - - val marshalledSize = colferSpan.marshal(marshalBuffer, 0) - val buffer = ByteBuffer.allocate(marshalledSize) - buffer.put(marshalBuffer, 0, marshalledSize) - buffer - - } else emptyBuffer - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - carrier.clear() - - if(carrier.capacity() == 0) + override def read(medium: ByteStreamReader, context: Context): Context = { + if(medium.available() == 0) context else { val identityProvider = Kamon.tracer.identityProvider val colferSpan = new ColferSpan() - colferSpan.unmarshal(carrier.array(), 0) + colferSpan.unmarshal(medium.readAll(), 0) val spanContext = SpanContext( traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), @@ -145,6 +124,24 @@ object SpanCodec { } } + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + medium.write(marshalBuffer, 0, marshalledSize) + + } + } private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { case SamplingDecision.Sample => 1 -- cgit v1.2.3