From 108fdef3eca8a88ec3774c465fdb0a3c764aa936 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Mon, 15 Oct 2018 15:33:07 -0300 Subject: Implement "b3 single" header format (#551) Implement 'b3 single' header format for Span propagation over HTTP --- .../kamon/trace/B3SingleSpanPropagationSpec.scala | 183 +++++++++++++++++++++ kamon-core/src/main/resources/reference.conf | 3 +- .../main/scala/kamon/trace/SpanPropagation.scala | 101 +++++++++++- 3 files changed, 282 insertions(+), 5 deletions(-) create mode 100644 kamon-core-tests/src/test/scala/kamon/trace/B3SingleSpanPropagationSpec.scala diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SingleSpanPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SingleSpanPropagationSpec.scala new file mode 100644 index 00000000..5c15bb71 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/trace/B3SingleSpanPropagationSpec.scala @@ -0,0 +1,183 @@ +/* + * ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import kamon.context.{Context, HttpPropagation} +import kamon.testkit.SpanBuilding +import kamon.trace.IdentityProvider.Identifier +import kamon.trace.SpanContext.SamplingDecision +import org.scalatest.{Matchers, OptionValues, WordSpecLike} + +import scala.collection.mutable + + +class B3SingleSpanPropagationSpec extends WordSpecLike with Matchers with OptionValues with SpanBuilding { + val b3SinglePropagation = SpanPropagation.B3Single() + + "The ExtendedB3 SpanContextCodec" should { + "return a TextMap containing the SpanContext data" in { + val headersMap = mutable.Map.empty[String, String] + b3SinglePropagation.write(testContext(), headerWriterFromMap(headersMap)) + + headersMap.get("B3").value shouldBe "1234-4321-1-2222" + } + + "do not include the X-B3-ParentSpanId if there is no parent" in { + val headersMap = mutable.Map.empty[String, String] + b3SinglePropagation.write(testContextWithoutParent(), headerWriterFromMap(headersMap)) + + headersMap.get("B3").value shouldBe "1234-4321-1" + } + + + "not inject anything if there is no Span in the Context" in { + val headersMap = mutable.Map.empty[String, String] + b3SinglePropagation.write(Context.Empty, headerWriterFromMap(headersMap)) + + headersMap.values shouldBe empty + } + + "extract a RemoteSpan from a TextMap when all fields are set" in { + val headersMap = Map("B3" -> "1234-4321-1-2222") + + val spanContext = b3SinglePropagation.read(headerReaderFromMap(headersMap), Context.Empty).get(Span.ContextKey).context() + + spanContext.traceID.string shouldBe "1234" + spanContext.spanID.string shouldBe "4321" + spanContext.parentID.string shouldBe "2222" + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "decode the sampling decision based on the X-B3-Sampled header" in { + val sampledHeadersMap = Map("B3" -> "1234-4321-1") + + val notSampledHeadersMap = Map("B3" -> "1234-4321-0") + + val noSamplingHeadersMap = Map("B3" -> "1234-4321") + + b3SinglePropagation.read(headerReaderFromMap(sampledHeadersMap), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Sample + + b3SinglePropagation.read(headerReaderFromMap(notSampledHeadersMap), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.DoNotSample + + b3SinglePropagation.read(headerReaderFromMap(noSamplingHeadersMap), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Unknown + } + + "not include the X-B3-Sampled header if the sampling decision is unknown" in { + val context = testContext() + val sampledSpanContext = context.get(Span.ContextKey).context() + val notSampledSpanContext = Context.Empty.withKey(Span.ContextKey, + Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.DoNotSample))) + val unknownSamplingSpanContext = Context.Empty.withKey(Span.ContextKey, + Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.Unknown))) + + val headersMap = mutable.Map.empty[String, String] + + b3SinglePropagation.write(context, headerWriterFromMap(headersMap)) + headersMap.get("B3").value shouldBe "1234-4321-1-2222" + headersMap.clear() + + b3SinglePropagation.write(notSampledSpanContext, headerWriterFromMap(headersMap)) + headersMap.get("B3").value shouldBe "1234-4321-0-2222" + headersMap.clear() + + b3SinglePropagation.write(unknownSamplingSpanContext,headerWriterFromMap(headersMap)) + headersMap.get("B3").value shouldBe "1234-4321-2222" + headersMap.clear() + } + + "use the Debug flag to override the sampling decision, if provided." in { + val headers = Map("B3" -> "1234-4321-d-2222") + + val spanContext = b3SinglePropagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "use the Debug flag as sampling decision when Sampled is not provided" in { + val headers = Map("B3" -> "1234-4321-d") + + val spanContext = b3SinglePropagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "extract a minimal SpanContext from a TextMap containing only the Trace ID and Span ID" in { + val headers = Map("B3" -> "1234-4321") + + val spanContext = b3SinglePropagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.traceID.string shouldBe "1234" + spanContext.spanID.string shouldBe "4321" + spanContext.parentID shouldBe IdentityProvider.NoIdentifier + spanContext.samplingDecision shouldBe SamplingDecision.Unknown + } + + "do not extract a SpanContext if Trace ID and Span ID are not provided" in { + val onlyTraceID = Map("B3" -> "1234--0") + val onlySpanID = Map("B3" -> "-4321-d") + val noIds = Map("B3" -> "--0") + + b3SinglePropagation.read(headerReaderFromMap(onlyTraceID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + b3SinglePropagation.read(headerReaderFromMap(onlySpanID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + b3SinglePropagation.read(headerReaderFromMap(noIds), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + } + + "round trip a Span from TextMap -> Context -> TextMap" in { + val headers = Map("B3" -> "1234-4312-1-2222") + + val writenHeaders = mutable.Map.empty[String, String] + val context = b3SinglePropagation.read(headerReaderFromMap(headers), Context.Empty) + b3SinglePropagation.write(context, headerWriterFromMap(writenHeaders)) + writenHeaders should contain theSameElementsAs headers + } + } + + def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { + override def read(header: String): Option[String] = { + if(map.get("fail").nonEmpty) + sys.error("failing on purpose") + + map.get(header) + } + + override def readAll(): Map[String, String] = map + } + + def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { + override def write(header: String, value: String): Unit = map.put(header, value) + } + + def testContext(): Context = { + val spanContext = createSpanContext().copy( + traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), + spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), + parentID = Identifier("2222", Array[Byte](2, 2, 2, 2)) + ) + + Context.of(Span.ContextKey, Span.Remote(spanContext)) + } + + def testContextWithoutParent(): Context = { + val spanContext = createSpanContext().copy( + traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), + spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), + parentID = IdentityProvider.NoIdentifier + ) + + Context.of(Span.ContextKey, Span.Remote(spanContext)) + } +} diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index a35225d4..2fa3d33b 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -192,12 +192,14 @@ kamon { # Specify mappings between Context keys and the Propagation.EntryReader[HeaderReader] implementation in charge # of reading them from the incoming HTTP request into the Context. incoming { + # kamon.trace.SpanPropagation$B3 for default header format or kamon.trace.SpanPropagation$B3Simple for 'b3 single' header format. span = "kamon.trace.SpanPropagation$B3" } # Specify mappings betwen Context keys and the Propagation.EntryWriter[HeaderWriter] implementation in charge # of writing them to outgoing HTTP requests. outgoing { + # kamon.trace.SpanPropagation$B3 for default header format or kamon.trace.SpanPropagation$B3Simple for 'b3 single' header format. span = "kamon.trace.SpanPropagation$B3" } } @@ -205,7 +207,6 @@ kamon { } binary { - # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on # this section for HTTP context propagation. # diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index dc168347..4cad1eb7 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -1,5 +1,5 @@ /* ========================================================================================= - * Copyright © 2013-2017 the kamon project + * Copyright © 2013-2018 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -16,13 +16,12 @@ package kamon.trace import java.net.{URLDecoder, URLEncoder} -import java.nio.ByteBuffer import kamon.Kamon import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} -import kamon.context._ -import kamon.context.generated.binary.span.{Span => ColferSpan} import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} +import kamon.context.generated.binary.span.{Span => ColferSpan} +import kamon.context.{Context, _} import kamon.trace.SpanContext.SamplingDecision @@ -108,6 +107,100 @@ object SpanPropagation { } } + /** + * This format corresponds to the propagation key "b3" (or "B3"), which delimits fields in the + * following manner. + * + *
{@code
+    * b3: {x-b3-traceid}-{x-b3-spanid}-{if x-b3-flags 'd' else x-b3-sampled}-{x-b3-parentspanid}
+    * }
+ * + *

See B3 Propagation + */ + class B3Single extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { + import B3Single._ + + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.read(Header.B3).map { header => + val identityProvider = Kamon.tracer.identityProvider + + val (traceID, spanID, samplingDecision, parentSpanID) = header.splitToTuple("-") + + val ti = traceID + .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val si = spanID + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + if (ti != IdentityProvider.NoIdentifier && si != IdentityProvider.NoIdentifier) { + val parentID = parentSpanID + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val sd = samplingDecision match { + case Some(sampled) if sampled == "1" || sampled.equalsIgnoreCase("d") => SamplingDecision.Sample + case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + context.withKey(Span.ContextKey, Span.Remote(SpanContext(ti, si, parentID, sd))) + } else context + }.getOrElse(context) + } + + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val buffer = new StringBuilder() + val spanContext = span.context() + + val traceId = urlEncode(spanContext.traceID.string) + val spanId = urlEncode(spanContext.spanID.string) + + buffer.append(traceId).append("-").append(spanId) + + encodeSamplingDecision(spanContext.samplingDecision) + .foreach(samplingDecision => buffer.append("-").append(samplingDecision)) + + if(spanContext.parentID != IdentityProvider.NoIdentifier) + buffer.append("-").append(urlEncode(spanContext.parentID.string)) + + writer.write(Header.B3, buffer.toString) + } + } + + + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { + case SamplingDecision.Sample => Some("1") + case SamplingDecision.DoNotSample => Some("0") + case SamplingDecision.Unknown => None + } + + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") + } + + object B3Single { + object Header { + val B3 = "B3" + } + + implicit class Syntax(val s: String) extends AnyVal { + def splitToTuple(regex: String): (Option[String], Option[String], Option[String], Option[String]) = { + s.split(regex) match { + case Array(str1, str2, str3, str4) => (Option(str1), Option(str2), Option(str3), Option(str4)) + case Array(str1, str2, str3) => (Option(str1), Option(str2), Option(str3), None) + case Array(str1, str2) => (Option(str1), Option(str2), None, None) + } + } + } + + def apply(): B3Single = + new B3Single() + } + /** * Defines a bare bones binary context propagation that uses Colfer [1] as the serialization library. The Schema -- cgit v1.2.3