From ce1424715f91beda67fd5f4da705d9b096147ca0 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 28 Sep 2018 13:07:41 +0200 Subject: cleanup the context propagation section on the configuration file --- .../test/scala/kamon/trace/B3SpanCodecSpec.scala | 232 --------------------- .../scala/kamon/trace/B3SpanPropagationSpec.scala | 232 +++++++++++++++++++++ kamon-core/src/main/resources/reference.conf | 77 ++----- .../src/main/scala/kamon/trace/SpanCodec.scala | 164 --------------- .../main/scala/kamon/trace/SpanPropagation.scala | 183 ++++++++++++++++ 5 files changed, 436 insertions(+), 452 deletions(-) delete mode 100644 kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala create mode 100644 kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala delete mode 100644 kamon-core/src/main/scala/kamon/trace/SpanCodec.scala create mode 100644 kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala deleted file mode 100644 index 73de22bb..00000000 --- a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import kamon.context.{Context, HttpPropagation} -import kamon.testkit.SpanBuilding -import kamon.trace.IdentityProvider.Identifier -import kamon.trace.SpanContext.SamplingDecision -import org.scalatest.{Matchers, OptionValues, WordSpecLike} - -import scala.collection.mutable - - -class B3SpanCodecSpec extends WordSpecLike with Matchers with OptionValues with SpanBuilding { - val b3Propagation = SpanCodec.B3() - - "The B3 Span propagation for HTTP" should { - "write the Span data into headers" in { - val headersMap = mutable.Map.empty[String, String] - b3Propagation.write(testContext(), headerWriterFromMap(headersMap)) - - headersMap.get("X-B3-TraceId").value shouldBe "1234" - headersMap.get("X-B3-ParentSpanId").value shouldBe "2222" - headersMap.get("X-B3-SpanId").value shouldBe "4321" - headersMap.get("X-B3-Sampled").value shouldBe "1" - } - - "do not include the X-B3-ParentSpanId if there is no parent" in { - val headersMap = mutable.Map.empty[String, String] - b3Propagation.write(testContextWithoutParent(), headerWriterFromMap(headersMap)) - - headersMap.get("X-B3-TraceId").value shouldBe "1234" - headersMap.get("X-B3-ParentSpanId") shouldBe empty - headersMap.get("X-B3-SpanId").value shouldBe "4321" - headersMap.get("X-B3-Sampled").value shouldBe "1" - } - - "not inject anything if there is no Span in the Context" in { - val headersMap = mutable.Map.empty[String, String] - b3Propagation.write(Context.Empty, headerWriterFromMap(headersMap)) - headersMap.values shouldBe empty - } - - "extract a RemoteSpan from incoming headers when all fields are set" in { - val headersMap = Map( - "X-B3-TraceId" -> "1234", - "X-B3-ParentSpanId" -> "2222", - "X-B3-SpanId" -> "4321", - "X-B3-Sampled" -> "1", - "X-B3-Extra-Baggage" -> "some=baggage;more=baggage", - ) - - val spanContext = b3Propagation.read(headerReaderFromMap(headersMap), Context.Empty).get(Span.ContextKey).context() - spanContext.traceID.string shouldBe "1234" - spanContext.spanID.string shouldBe "4321" - spanContext.parentID.string shouldBe "2222" - spanContext.samplingDecision shouldBe SamplingDecision.Sample - } - - "decode the sampling decision based on the X-B3-Sampled header" in { - val sampledHeaders = Map( - "X-B3-TraceId" -> "1234", - "X-B3-SpanId" -> "4321", - "X-B3-Sampled" -> "1" - ) - - val notSampledHeaders = Map( - "X-B3-TraceId" -> "1234", - "X-B3-SpanId" -> "4321", - "X-B3-Sampled" -> "0" - ) - - val noSamplingHeaders = Map( - "X-B3-TraceId" -> "1234", - "X-B3-SpanId" -> "4321" - ) - - b3Propagation.read(headerReaderFromMap(sampledHeaders), Context.Empty) - .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Sample - - b3Propagation.read(headerReaderFromMap(notSampledHeaders), Context.Empty) - .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.DoNotSample - - b3Propagation.read(headerReaderFromMap(noSamplingHeaders), Context.Empty) - .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Unknown - } - - "not include the X-B3-Sampled header if the sampling decision is unknown" in { - val context = testContext() - val sampledSpanContext = context.get(Span.ContextKey).context() - val notSampledSpanContext = Context.Empty.withKey(Span.ContextKey, - Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.DoNotSample))) - val unknownSamplingSpanContext = Context.Empty.withKey(Span.ContextKey, - Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.Unknown))) - val headersMap = mutable.Map.empty[String, String] - - b3Propagation.write(context, headerWriterFromMap(headersMap)) - headersMap.get("X-B3-Sampled").value shouldBe("1") - headersMap.clear() - - b3Propagation.write(notSampledSpanContext, headerWriterFromMap(headersMap)) - headersMap.get("X-B3-Sampled").value shouldBe("0") - headersMap.clear() - - b3Propagation.write(unknownSamplingSpanContext, headerWriterFromMap(headersMap)) - headersMap.get("X-B3-Sampled") shouldBe empty - } - - "use the Debug flag to override the sampling decision, if provided." in { - val headers = Map( - "X-B3-TraceId" -> "1234", - "X-B3-SpanId" -> "4321", - "X-B3-Sampled" -> "0", - "X-B3-Flags" -> "1" - ) - - val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() - spanContext.samplingDecision shouldBe SamplingDecision.Sample - } - - "use the Debug flag as sampling decision when Sampled is not provided" in { - val headers = Map( - "X-B3-TraceId" -> "1234", - "X-B3-SpanId" -> "4321", - "X-B3-Flags" -> "1" - ) - - val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() - spanContext.samplingDecision shouldBe SamplingDecision.Sample - } - - "extract a minimal SpanContext from a TextMap containing only the Trace ID and Span ID" in { - val headers = Map( - "X-B3-TraceId" -> "1234", - "X-B3-SpanId" -> "4321" - ) - - val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() - spanContext.traceID.string shouldBe "1234" - spanContext.spanID.string shouldBe "4321" - spanContext.parentID shouldBe IdentityProvider.NoIdentifier - spanContext.samplingDecision shouldBe SamplingDecision.Unknown - } - - "do not extract a SpanContext if Trace ID and Span ID are not provided" in { - val onlyTraceID = Map( - "X-B3-TraceId" -> "1234", - "X-B3-Sampled" -> "0", - "X-B3-Flags" -> "1" - ) - - val onlySpanID = Map( - "X-B3-SpanId" -> "1234", - "X-B3-Sampled" -> "0", - "X-B3-Flags" -> "1" - ) - - val noIds = Map( - "X-B3-Sampled" -> "0", - "X-B3-Flags" -> "1" - ) - - b3Propagation.read(headerReaderFromMap(onlyTraceID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty - b3Propagation.read(headerReaderFromMap(onlySpanID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty - b3Propagation.read(headerReaderFromMap(noIds), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty - } - - "round trip a Span from TextMap -> Context -> TextMap" in { - val headers = Map( - "X-B3-TraceId" -> "1234", - "X-B3-SpanId" -> "4321", - "X-B3-ParentSpanId" -> "2222", - "X-B3-Sampled" -> "1" - ) - - val writenHeaders = mutable.Map.empty[String, String] - val context = b3Propagation.read(headerReaderFromMap(headers), Context.Empty) - b3Propagation.write(context, headerWriterFromMap(writenHeaders)) - writenHeaders should contain theSameElementsAs(headers) - } - } - - def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { - override def read(header: String): Option[String] = { - if(map.get("fail").nonEmpty) - sys.error("failing on purpose") - - map.get(header) - } - - override def readAll(): Map[String, String] = map - } - - def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { - override def write(header: String, value: String): Unit = map.put(header, value) - } - - def testContext(): Context = { - val spanContext = createSpanContext().copy( - traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), - spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), - parentID = Identifier("2222", Array[Byte](2, 2, 2, 2)) - ) - - Context.of(Span.ContextKey, Span.Remote(spanContext)) - } - - def testContextWithoutParent(): Context = { - val spanContext = createSpanContext().copy( - traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), - spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), - parentID = IdentityProvider.NoIdentifier - ) - - Context.of(Span.ContextKey, Span.Remote(spanContext)) - } - -} \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala new file mode 100644 index 00000000..5b912b92 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala @@ -0,0 +1,232 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import kamon.context.{Context, HttpPropagation} +import kamon.testkit.SpanBuilding +import kamon.trace.IdentityProvider.Identifier +import kamon.trace.SpanContext.SamplingDecision +import org.scalatest.{Matchers, OptionValues, WordSpecLike} + +import scala.collection.mutable + + +class B3SpanPropagationSpec extends WordSpecLike with Matchers with OptionValues with SpanBuilding { + val b3Propagation = SpanPropagation.B3() + + "The B3 Span propagation for HTTP" should { + "write the Span data into headers" in { + val headersMap = mutable.Map.empty[String, String] + b3Propagation.write(testContext(), headerWriterFromMap(headersMap)) + + headersMap.get("X-B3-TraceId").value shouldBe "1234" + headersMap.get("X-B3-ParentSpanId").value shouldBe "2222" + headersMap.get("X-B3-SpanId").value shouldBe "4321" + headersMap.get("X-B3-Sampled").value shouldBe "1" + } + + "do not include the X-B3-ParentSpanId if there is no parent" in { + val headersMap = mutable.Map.empty[String, String] + b3Propagation.write(testContextWithoutParent(), headerWriterFromMap(headersMap)) + + headersMap.get("X-B3-TraceId").value shouldBe "1234" + headersMap.get("X-B3-ParentSpanId") shouldBe empty + headersMap.get("X-B3-SpanId").value shouldBe "4321" + headersMap.get("X-B3-Sampled").value shouldBe "1" + } + + "not inject anything if there is no Span in the Context" in { + val headersMap = mutable.Map.empty[String, String] + b3Propagation.write(Context.Empty, headerWriterFromMap(headersMap)) + headersMap.values shouldBe empty + } + + "extract a RemoteSpan from incoming headers when all fields are set" in { + val headersMap = Map( + "X-B3-TraceId" -> "1234", + "X-B3-ParentSpanId" -> "2222", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "1", + "X-B3-Extra-Baggage" -> "some=baggage;more=baggage" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headersMap), Context.Empty).get(Span.ContextKey).context() + spanContext.traceID.string shouldBe "1234" + spanContext.spanID.string shouldBe "4321" + spanContext.parentID.string shouldBe "2222" + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "decode the sampling decision based on the X-B3-Sampled header" in { + val sampledHeaders = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "1" + ) + + val notSampledHeaders = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "0" + ) + + val noSamplingHeaders = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321" + ) + + b3Propagation.read(headerReaderFromMap(sampledHeaders), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Sample + + b3Propagation.read(headerReaderFromMap(notSampledHeaders), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.DoNotSample + + b3Propagation.read(headerReaderFromMap(noSamplingHeaders), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Unknown + } + + "not include the X-B3-Sampled header if the sampling decision is unknown" in { + val context = testContext() + val sampledSpanContext = context.get(Span.ContextKey).context() + val notSampledSpanContext = Context.Empty.withKey(Span.ContextKey, + Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.DoNotSample))) + val unknownSamplingSpanContext = Context.Empty.withKey(Span.ContextKey, + Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.Unknown))) + val headersMap = mutable.Map.empty[String, String] + + b3Propagation.write(context, headerWriterFromMap(headersMap)) + headersMap.get("X-B3-Sampled").value shouldBe("1") + headersMap.clear() + + b3Propagation.write(notSampledSpanContext, headerWriterFromMap(headersMap)) + headersMap.get("X-B3-Sampled").value shouldBe("0") + headersMap.clear() + + b3Propagation.write(unknownSamplingSpanContext, headerWriterFromMap(headersMap)) + headersMap.get("X-B3-Sampled") shouldBe empty + } + + "use the Debug flag to override the sampling decision, if provided." in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "use the Debug flag as sampling decision when Sampled is not provided" in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Flags" -> "1" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "extract a minimal SpanContext from a TextMap containing only the Trace ID and Span ID" in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.traceID.string shouldBe "1234" + spanContext.spanID.string shouldBe "4321" + spanContext.parentID shouldBe IdentityProvider.NoIdentifier + spanContext.samplingDecision shouldBe SamplingDecision.Unknown + } + + "do not extract a SpanContext if Trace ID and Span ID are not provided" in { + val onlyTraceID = Map( + "X-B3-TraceId" -> "1234", + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + val onlySpanID = Map( + "X-B3-SpanId" -> "1234", + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + val noIds = Map( + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + b3Propagation.read(headerReaderFromMap(onlyTraceID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + b3Propagation.read(headerReaderFromMap(onlySpanID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + b3Propagation.read(headerReaderFromMap(noIds), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + } + + "round trip a Span from TextMap -> Context -> TextMap" in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-ParentSpanId" -> "2222", + "X-B3-Sampled" -> "1" + ) + + val writenHeaders = mutable.Map.empty[String, String] + val context = b3Propagation.read(headerReaderFromMap(headers), Context.Empty) + b3Propagation.write(context, headerWriterFromMap(writenHeaders)) + writenHeaders should contain theSameElementsAs(headers) + } + } + + def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { + override def read(header: String): Option[String] = { + if(map.get("fail").nonEmpty) + sys.error("failing on purpose") + + map.get(header) + } + + override def readAll(): Map[String, String] = map + } + + def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { + override def write(header: String, value: String): Unit = map.put(header, value) + } + + def testContext(): Context = { + val spanContext = createSpanContext().copy( + traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), + spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), + parentID = Identifier("2222", Array[Byte](2, 2, 2, 2)) + ) + + Context.of(Span.ContextKey, Span.Remote(spanContext)) + } + + def testContextWithoutParent(): Context = { + val spanContext = createSpanContext().copy( + traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), + spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), + parentID = IdentityProvider.NoIdentifier + ) + + Context.of(Span.ContextKey, Span.Remote(spanContext)) + } + +} \ No newline at end of file diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index a4a7871e..eb374455 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -150,57 +150,17 @@ kamon { } } - - context { - - # Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or - # Binary transports. Only broadcast keys configured bellow will be processed by the context Codec. The FQCN of - # the appropriate Codecs for each key must be provided, otherwise keys will be ignored. - # - codecs { - - # Size of the encoding buffer for the Binary Codec. - binary-buffer-size = 256 - - # Declarative definition of broadcast context keys with type Option[String]. The setting key represents the actual - # key name and the value is the HTTP header name to be used to encode/decode the context key. The key name will - # be used when coding for binary transport. The most common use case for string keys is effortless propagation of - # correlation keys or request related data (locale, user ID, etc). E.g. if wanting to propagate a "X-Request-ID" - # header this config should suffice: - # - # kamon.context.codecs.string-keys { - # request-id = "X-Request-ID" - # } - # - # If the application must read this context key they can define key with a matching name and read the value from - # the context: - # val requestIDKey = Key.broadcastString("request-id") // Do this only once, keep a reference. - # val requestID = Kamon.currentContext().get(requestIDKey) - # - string-keys { - - } - - # Codecs to be used when propagating a Context through a HTTP Headers transport. - http-headers-keys { - //span = "kamon.trace.SpanCodec$B3" - } - - # Codecs to be used when propagating a Context through a Binary transport. - binary-keys { - //span = "kamon.trace.SpanCodec$Colfer" - } - } - } - - propagation { + http { # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on # this section for HTTP context propagation. # default { + + # Configures how context tags will be propagated over HTTP headers. + # tags { # Header name used to encode context tags. @@ -225,18 +185,20 @@ kamon { } } + # Configure which entries should be read from incoming HTTP requests and writen to outgoing HTTP requests. + # entries { - # Specify mappings between Context keys and the Http.EntryReader implementation in charge of reading them - # from the incoming HTTP request into the Context. + # Specify mappings between Context keys and the Propagation.EntryReader[HeaderReader] implementation in charge + # of reading them from the incoming HTTP request into the Context. incoming { - #span = "something" + span = "kamon.trace.SpanPropagation$B3" } - # Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them - # on the outgoing HTTP requests. + # Specify mappings betwen Context keys and the Propagation.EntryWriter[HeaderWriter] implementation in charge + # of writing them to outgoing HTTP requests. outgoing { - + span = "kamon.trace.SpanPropagation$B3" } } } @@ -248,18 +210,21 @@ kamon { # this section for HTTP context propagation. # default { + + # Configure which entries should be read from incoming messages and writen to outgoing messages. + # entries { - # Specify mappings between Context keys and the Http.EntryReader implementation in charge of reading them - # from the incoming HTTP request into the Context. + # Specify mappings between Context keys and the Propagation.EntryReader[ByteStreamReader] implementation in + # charge of reading them from the incoming messages into the Context. incoming { - #span = "something" + span = "kamon.trace.SpanPropagation$Colfer" } - # Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them - # on the outgoing HTTP requests. + # Specify mappings betwen Context keys and the Propagation.EntryWriter[ByteStreamWriter] implementation in + # charge of writing them on the outgoing messages. outgoing { - + span = "kamon.trace.SpanPropagation$Colfer" } } } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala deleted file mode 100644 index 63f8e1b0..00000000 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import java.net.{URLDecoder, URLEncoder} -import java.nio.ByteBuffer - -import kamon.Kamon -import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} -import kamon.context._ -import kamon.context.generated.binary.span.{Span => ColferSpan} -import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} -import kamon.trace.SpanContext.SamplingDecision - - -object SpanCodec { - - class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { - import B3.Headers - - override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { - val identityProvider = Kamon.tracer.identityProvider - val traceID = reader.read(Headers.TraceIdentifier) - .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) - .getOrElse(IdentityProvider.NoIdentifier) - - val spanID = reader.read(Headers.SpanIdentifier) - .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) - .getOrElse(IdentityProvider.NoIdentifier) - - if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = reader.read(Headers.ParentSpanIdentifier) - .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) - .getOrElse(IdentityProvider.NoIdentifier) - - val flags = reader.read(Headers.Flags) - - val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { - case Some(sampled) if sampled == "1" => SamplingDecision.Sample - case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample - case _ => SamplingDecision.Unknown - } - - context.withKey(Span.ContextKey, Span.Remote(SpanContext(traceID, spanID, parentID, samplingDecision))) - - } else context - } - - - override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { - val span = context.get(Span.ContextKey) - - if(span.nonEmpty()) { - val spanContext = span.context() - writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) - - if(spanContext.parentID != IdentityProvider.NoIdentifier) - writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) - - encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - writer.write(Headers.Sampled, samplingDecision) - } - } - } - - private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { - case SamplingDecision.Sample => Some("1") - case SamplingDecision.DoNotSample => Some("0") - case SamplingDecision.Unknown => None - } - - private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") - private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") - } - - object B3 { - - def apply(): B3 = - new B3() - - object Headers { - val TraceIdentifier = "X-B3-TraceId" - val ParentSpanIdentifier = "X-B3-ParentSpanId" - val SpanIdentifier = "X-B3-SpanId" - val Sampled = "X-B3-Sampled" - val Flags = "X-B3-Flags" - } - } - - - class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { - val emptyBuffer = ByteBuffer.allocate(0) - - override def read(medium: ByteStreamReader, context: Context): Context = { - if(medium.available() == 0) - context - else { - val identityProvider = Kamon.tracer.identityProvider - val colferSpan = new ColferSpan() - colferSpan.unmarshal(medium.readAll(), 0) - - val spanContext = SpanContext( - traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), - spanID = identityProvider.spanIdGenerator().from(colferSpan.spanID), - parentID = identityProvider.spanIdGenerator().from(colferSpan.parentID), - samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision) - ) - - context.withKey(Span.ContextKey, Span.Remote(spanContext)) - } - } - - override def write(context: Context, medium: ByteStreamWriter): Unit = { - val span = context.get(Span.ContextKey) - - if(span.nonEmpty()) { - val marshalBuffer = Colfer.codecBuffer.get() - val colferSpan = new ColferSpan() - val spanContext = span.context() - - colferSpan.setTraceID(spanContext.traceID.bytes) - colferSpan.setSpanID(spanContext.spanID.bytes) - colferSpan.setParentID(spanContext.parentID.bytes) - colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) - - val marshalledSize = colferSpan.marshal(marshalBuffer, 0) - medium.write(marshalBuffer, 0, marshalledSize) - - } - } - - private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { - case SamplingDecision.Sample => 1 - case SamplingDecision.DoNotSample => 2 - case SamplingDecision.Unknown => 3 - } - - private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match { - case 1 => SamplingDecision.Sample - case 2 => SamplingDecision.DoNotSample - case _ => SamplingDecision.Unknown - } - } - - object Colfer { - private val codecBuffer = new ThreadLocal[Array[Byte]] { - override def initialValue(): Array[Byte] = Array.ofDim[Byte](256) - } - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala new file mode 100644 index 00000000..b83a5ade --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -0,0 +1,183 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.{URLDecoder, URLEncoder} +import java.nio.ByteBuffer + +import kamon.Kamon +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context._ +import kamon.context.generated.binary.span.{Span => ColferSpan} +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} +import kamon.trace.SpanContext.SamplingDecision + + +/** + * Propagation mechanisms for Kamon's Span data to and from HTTP and Binary mediums. + */ +object SpanPropagation { + + /** + * Reads and Writes a Span instance using the B3 propagation format. The specification and semantics of the B3 + * Propagation protocol can be found here: https://github.com/openzipkin/b3-propagation + */ + class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { + import B3.Headers + + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + val identityProvider = Kamon.tracer.identityProvider + val traceID = reader.read(Headers.TraceIdentifier) + .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val spanID = reader.read(Headers.SpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { + val parentID = reader.read(Headers.ParentSpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val flags = reader.read(Headers.Flags) + + val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { + case Some(sampled) if sampled == "1" => SamplingDecision.Sample + case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + + context.withKey(Span.ContextKey, Span.Remote(SpanContext(traceID, spanID, parentID, samplingDecision))) + + } else context + } + + + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val spanContext = span.context() + writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + + if(spanContext.parentID != IdentityProvider.NoIdentifier) + writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + + encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => + writer.write(Headers.Sampled, samplingDecision) + } + } + } + + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { + case SamplingDecision.Sample => Some("1") + case SamplingDecision.DoNotSample => Some("0") + case SamplingDecision.Unknown => None + } + + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") + } + + object B3 { + + def apply(): B3 = + new B3() + + object Headers { + val TraceIdentifier = "X-B3-TraceId" + val ParentSpanIdentifier = "X-B3-ParentSpanId" + val SpanIdentifier = "X-B3-SpanId" + val Sampled = "X-B3-Sampled" + val Flags = "X-B3-Flags" + } + } + + + /** + * Defines a bare bones binary context propagation that uses Colfer [1] as the serialization library. The Schema + * for the Span data is simply defined as: + * + * type Span struct { + * traceID binary + * spanID binary + * parentID binary + * samplingDecision uint8 + * } + * + */ + class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { + val emptyBuffer = ByteBuffer.allocate(0) + + override def read(medium: ByteStreamReader, context: Context): Context = { + if(medium.available() == 0) + context + else { + val identityProvider = Kamon.tracer.identityProvider + val colferSpan = new ColferSpan() + colferSpan.unmarshal(medium.readAll(), 0) + + val spanContext = SpanContext( + traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), + spanID = identityProvider.spanIdGenerator().from(colferSpan.spanID), + parentID = identityProvider.spanIdGenerator().from(colferSpan.parentID), + samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision) + ) + + context.withKey(Span.ContextKey, Span.Remote(spanContext)) + } + } + + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + medium.write(marshalBuffer, 0, marshalledSize) + + } + } + + private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { + case SamplingDecision.Sample => 1 + case SamplingDecision.DoNotSample => 2 + case SamplingDecision.Unknown => 3 + } + + private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match { + case 1 => SamplingDecision.Sample + case 2 => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + } + + object Colfer { + private val codecBuffer = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](256) + } + } +} -- cgit v1.2.3