From b61c92ea3589450fd097ab79420230b61b458ae4 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 28 Sep 2018 12:08:24 +0200 Subject: cleanup HTTP propagation, introduce a new Binary propagation --- .../src/test/scala/kamon/KamonLifecycleSpec.scala | 2 +- .../kamon/context/BinaryPropagationSpec.scala | 98 +++++++ .../scala/kamon/context/ContextCodecSpec.scala | 93 ------- .../kamon/context/ContextSerializationSpec.scala | 50 ++-- .../scala/kamon/context/HttpPropagationSpec.scala | 53 ++-- .../HttpServerInstrumentationSpec.scala | 5 +- .../test/scala/kamon/trace/B3SpanCodecSpec.scala | 2 +- kamon-core/src/main/colfer/Context.colf | 1 + kamon-core/src/main/colfer/building.md | 2 +- .../context/generated/binary/context/Context.java | 132 ++++++++- kamon-core/src/main/resources/reference.conf | 21 +- .../src/main/scala/kamon/ContextPropagation.scala | 48 +++- kamon-core/src/main/scala/kamon/Kamon.scala | 8 - .../scala/kamon/context/BinaryPropagation.scala | 277 +++++++++++++++++++ .../src/main/scala/kamon/context/Codecs.scala | 297 --------------------- .../src/main/scala/kamon/context/Context.scala | 25 +- .../main/scala/kamon/context/HttpPropagation.scala | 204 +++++--------- .../src/main/scala/kamon/context/Propagation.scala | 81 ++++++ .../src/main/scala/kamon/context/Storage.scala | 2 +- .../scala/kamon/instrumentation/HttpServer.scala | 5 +- .../src/main/scala/kamon/trace/SpanCodec.scala | 77 +++--- .../scala/kamon/testkit/SimpleStringCodec.scala | 57 ---- 22 files changed, 817 insertions(+), 723 deletions(-) create mode 100644 kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala delete mode 100644 kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala create mode 100644 kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala delete mode 100644 kamon-core/src/main/scala/kamon/context/Codecs.scala create mode 100644 kamon-core/src/main/scala/kamon/context/Propagation.scala delete mode 100644 kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala index b5d0425b..ee80fd2b 100644 --- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala @@ -12,7 +12,7 @@ import org.scalatest.time.SpanSugar._ class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{ - "the Kamon lifecycle" should { + "the Kamon lifecycle" ignore { "keep the JVM running if reporters are running" in { val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter")) Thread.sleep(5000) diff --git a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala new file mode 100644 index 00000000..99e72f59 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala @@ -0,0 +1,98 @@ +package kamon.context + +import java.io.ByteArrayOutputStream + +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.Propagation.{EntryReader, EntryWriter} +import org.scalatest.{Matchers, OptionValues, WordSpec} + +class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { + + "The Binary Context Propagation" should { + "return an empty context if there is no data to read from" in { + val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0))) + context.isEmpty() shouldBe true + } + + "not write any data to the medium if the context is empty" in { + val writer = inspectableByteStreamWriter() + binaryPropagation.write(Context.Empty, writer) + writer.size() shouldBe 0 + } + + "round trip a Context that only has tags" in { + val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.entries shouldBe empty + rtContext.tags should contain theSameElementsAs (context.tags) + } + + "round trip a Context that only has entries" in { + val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.tags shouldBe empty + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key + } + + "round trip a Context that with tags and entries" in { + val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) + .withKey(BinaryPropagationSpec.StringKey, "string-value") + .withKey(BinaryPropagationSpec.IntegerKey, 42) + + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + + rtContext.tags should contain theSameElementsAs (context.tags) + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key + } + } + + val binaryPropagation = BinaryPropagation.from( + ConfigFactory.parseString( + """ + | + |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + | + """.stripMargin + ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) + + + def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter + +} + +object BinaryPropagationSpec { + + val StringKey = Context.key[String]("string", null) + val IntegerKey = Context.key[Int]("integer", 0) + + class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { + + override def read(medium: ByteStreamReader, context: Context): Context = { + val valueData = medium.readAll() + + if(valueData.length > 0) { + context.withKey(StringKey, new String(valueData)) + } else context + } + + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val value = context.get(StringKey) + if(value != null) { + medium.write(value.getBytes) + } + } + } +} \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala deleted file mode 100644 index 6a43bd01..00000000 --- a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import kamon.Kamon -import kamon.testkit.ContextTesting -import org.scalatest.{Matchers, OptionValues, WordSpec} - -class ContextCodecSpec extends WordSpec with Matchers with ContextTesting with OptionValues { - "the Context Codec" ignore { - "encoding/decoding to HttpHeaders" should { - "round trip a empty context" in { - val textMap = ContextCodec.HttpHeaders.encode(Context.Empty) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with only local keys" in { - val localOnlyContext = Context.of(StringKey, Some("string-value")) - val textMap = ContextCodec.HttpHeaders.encode(localOnlyContext) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with local and broadcast keys" in { - val initialContext = Context.Empty - .withKey(StringKey, Some("string-value")) - .withKey(StringBroadcastKey, Some("this-should-be-round-tripped")) - - val textMap = ContextCodec.HttpHeaders.encode(initialContext) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext.get(StringKey) shouldBe empty - decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped" - } - - "read string broadcast keys using the configured header name" in { - val textMap = TextMap.Default() - textMap.put("X-Request-ID", "123456") - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - //decodedContext.get(Key.broadcastString("request-id")).value shouldBe "123456" - } - } - - "encoding/decoding to Binary" should { - "round trip a empty context" in { - val byteBuffer = ContextCodec.Binary.encode(Context.Empty) - - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with only local keys" in { - val localOnlyContext = Context.of(StringKey, Some("string-value")) - val byteBuffer = ContextCodec.Binary.encode(localOnlyContext) - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with local and broadcast keys" in { - val initialContext = Context.Empty - .withKey(StringKey, Some("string-value")) - .withKey(StringBroadcastKey, Some("this-should-be-round-tripped")) - - val byteBuffer = ContextCodec.Binary.encode(initialContext) - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext.get(StringKey) shouldBe empty - decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped" - } - } - } - - val ContextCodec = new Codecs(Kamon.config()) -} \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala index 22400fa9..7ffb0838 100644 --- a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala @@ -22,29 +22,29 @@ import kamon.testkit.ContextTesting import org.scalatest.{Matchers, OptionValues, WordSpec} class ContextSerializationSpec extends WordSpec with Matchers with ContextTesting with OptionValues { - "the Context is Serializable" ignore { - "empty " in { - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(Context.Empty) - - val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) - val ctx = ois.readObject().asInstanceOf[Context] - ctx shouldBe Context.Empty - } - - "full" in { - val sCtx = Context.of(StringBroadcastKey, Some("disi")) - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(sCtx) - - val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) - val rCtx = ois.readObject().asInstanceOf[Context] - rCtx shouldBe sCtx - } - - } - - val ContextCodec = new Codecs(Kamon.config()) +// "the Context is Serializable" ignore { +// "empty " in { +// val bos = new ByteArrayOutputStream() +// val oos = new ObjectOutputStream(bos) +// oos.writeObject(Context.Empty) +// +// val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) +// val ctx = ois.readObject().asInstanceOf[Context] +// ctx shouldBe Context.Empty +// } +// +// "full" in { +// val sCtx = Context.of(StringBroadcastKey, Some("disi")) +// val bos = new ByteArrayOutputStream() +// val oos = new ObjectOutputStream(bos) +// oos.writeObject(sCtx) +// +// val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) +// val rCtx = ois.readObject().asInstanceOf[Context] +// rCtx shouldBe sCtx +// } +// +// } +// +// val ContextCodec = new Codecs(Kamon.config()) } \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala index 44165b98..ac1250ea 100644 --- a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala @@ -2,7 +2,8 @@ package kamon.context import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.context.HttpPropagation.Direction +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} +import kamon.context.Propagation.{EntryReader, EntryWriter} import org.scalatest.{Matchers, OptionValues, WordSpec} import scala.collection.mutable @@ -12,9 +13,8 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "The HTTP Context Propagation" when { "reading from incoming requests" should { "return an empty context if there are no tags nor keys" in { - val context = httpPropagation.readContext(headerReaderFromMap(Map.empty)) - context.tags shouldBe empty - context.entries shouldBe empty + val context = httpPropagation.read(headerReaderFromMap(Map.empty)) + context.isEmpty() shouldBe true } "read tags from an HTTP message when they are available" in { @@ -22,7 +22,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "x-content-tags" -> "hello=world;correlation=1234", "x-mapped-tag" -> "value" ) - val context = httpPropagation.readContext(headerReaderFromMap(headers)) + val context = httpPropagation.read(headerReaderFromMap(headers)) context.tags should contain only( "hello" -> "world", "correlation" -> "1234", @@ -32,7 +32,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "handle errors when reading HTTP headers" in { val headers = Map("fail" -> "") - val context = httpPropagation.readContext(headerReaderFromMap(headers)) + val context = httpPropagation.read(headerReaderFromMap(headers)) context.tags shouldBe empty context.entries shouldBe empty } @@ -44,7 +44,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "integer-header" -> "123" ) - val context = httpPropagation.readContext(headerReaderFromMap(headers)) + val context = httpPropagation.read(headerReaderFromMap(headers)) context.get(HttpPropagationSpec.StringKey) shouldBe "hey" context.get(HttpPropagationSpec.IntegerKey) shouldBe 123 context.get(HttpPropagationSpec.OptionalKey) shouldBe empty @@ -56,17 +56,9 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "writing to outgoing requests" should { - propagationWritingTests(Direction.Outgoing) - } - - "writing to returning requests" should { - propagationWritingTests(Direction.Returning) - } - - def propagationWritingTests(direction: Direction.Write) = { "not write anything if the context is empty" in { val headers = mutable.Map.empty[String, String] - httpPropagation.writeContext(Context.Empty, headerWriterFromMap(headers), direction) + httpPropagation.write(Context.Empty, headerWriterFromMap(headers)) headers shouldBe empty } @@ -77,7 +69,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "mappedTag" -> "value" )) - httpPropagation.writeContext(context, headerWriterFromMap(headers), direction) + httpPropagation.write(context, headerWriterFromMap(headers)) headers should contain only( "x-content-tags" -> "hello=world;", "x-mapped-tag" -> "value" @@ -91,10 +83,10 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { HttpPropagationSpec.IntegerKey, 42, ) - httpPropagation.writeContext(context, headerWriterFromMap(headers), direction) + httpPropagation.write(context, headerWriterFromMap(headers)) headers should contain only( "string-header" -> "out-we-go" - ) + ) } } } @@ -116,23 +108,24 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { |entries.incoming.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" |entries.incoming.integer = "kamon.context.HttpPropagationSpec$IntegerEntryCodec" |entries.outgoing.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" - |entries.returning.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" | """.stripMargin ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { - override def readHeader(header: String): Option[String] = { + override def read(header: String): Option[String] = { if(map.get("fail").nonEmpty) sys.error("failing on purpose") map.get(header) } + + override def readAll(): Map[String, String] = map } def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { - override def writeHeader(header: String, value: String): Unit = map.put(header, value) + override def write(header: String, value: String): Unit = map.put(header, value) } } @@ -143,23 +136,23 @@ object HttpPropagationSpec { val OptionalKey = Context.key[Option[String]]("optional", None) - class StringEntryCodec extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { + class StringEntryCodec extends EntryReader[HeaderReader] with EntryWriter[HeaderWriter] { private val HeaderName = "string-header" - override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { - reader.readHeader(HeaderName) + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.read(HeaderName) .map(v => context.withKey(StringKey, v)) .getOrElse(context) } - override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { - Option(context.get(StringKey)).foreach(v => writer.writeHeader(HeaderName, v)) + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { + Option(context.get(StringKey)).foreach(v => writer.write(HeaderName, v)) } } - class IntegerEntryCodec extends HttpPropagation.EntryReader { - override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { - reader.readHeader("integer-header") + class IntegerEntryCodec extends EntryReader[HeaderReader] { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.read("integer-header") .map(v => context.withKey(IntegerKey, v.toInt)) .getOrElse(context) diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala index 19e08666..d334184d 100644 --- a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala @@ -272,13 +272,14 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp override def url: String = requestUrl override def path: String = requestPath override def method: String = requestMethod - override def readHeader(header: String): Option[String] = headers.get(header) + override def read(header: String): Option[String] = headers.get(header) + override def readAll(): Map[String, String] = headers } def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpResponse.Writable[HttpResponse] = new HttpResponse.Writable[HttpResponse] { override def statusCode: Int = responseStatusCode - override def writeHeader(header: String, value: String): Unit = headers.put(header, value) + override def write(header: String, value: String): Unit = headers.put(header, value) override def build(): HttpResponse = this } diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala index 76e07c31..37a95a68 100644 --- a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala @@ -16,7 +16,7 @@ package kamon.trace -import kamon.context.{Context, TextMap} +import kamon.context.{Context} import kamon.testkit.SpanBuilding import kamon.trace.IdentityProvider.Identifier import kamon.trace.SpanContext.SamplingDecision diff --git a/kamon-core/src/main/colfer/Context.colf b/kamon-core/src/main/colfer/Context.colf index f84d7a56..f5d9a80b 100644 --- a/kamon-core/src/main/colfer/Context.colf +++ b/kamon-core/src/main/colfer/Context.colf @@ -6,5 +6,6 @@ type Entry struct { } type Context struct { + tags []text entries []Entry } \ No newline at end of file diff --git a/kamon-core/src/main/colfer/building.md b/kamon-core/src/main/colfer/building.md index f510a44f..8b663828 100644 --- a/kamon-core/src/main/colfer/building.md +++ b/kamon-core/src/main/colfer/building.md @@ -1,4 +1,4 @@ -Just download and install the colver compiler and run this command from the colfer folder: +Just download and install the colfer compiler and run this command from the colfer folder: ``` colfer -b ../java -p kamon/context/generated/binary java diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java index a9917e99..4be6d630 100644 --- a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java +++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java @@ -12,6 +12,7 @@ import java.io.ObjectOutputStream; import java.io.ObjectStreamException; import java.io.OutputStream; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.InputMismatchException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; @@ -35,6 +36,8 @@ public class Context implements Serializable { + public String[] tags; + public Entry[] entries; @@ -43,10 +46,12 @@ public class Context implements Serializable { init(); } + private static final String[] _zeroTags = new String[0]; private static final Entry[] _zeroEntries = new Entry[0]; /** Colfer zero values. */ private void init() { + tags = _zeroTags; entries = _zeroEntries; } @@ -142,6 +147,7 @@ public class Context implements Serializable { /** * Serializes the object. + * All {@code null} elements in {@link #tags} will be replaced with {@code ""}. * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. * @param out the data destination. * @param buf the initial buffer or {@code null}. @@ -171,6 +177,7 @@ public class Context implements Serializable { /** * Serializes the object. + * All {@code null} elements in {@link #tags} will be replaced with {@code ""}. * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. * @param buf the data destination. * @param offset the initial index for {@code buf}, inclusive. @@ -182,8 +189,72 @@ public class Context implements Serializable { int i = offset; try { - if (this.entries.length != 0) { + if (this.tags.length != 0) { buf[i++] = (byte) 0; + String[] a = this.tags; + + int x = a.length; + if (x > Context.colferListMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", x, Context.colferListMax)); + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + for (int ai = 0; ai < a.length; ai++) { + String s = a[ai]; + if (s == null) { + s = ""; + a[ai] = s; + } + + int start = ++i; + + for (int sIndex = 0, sLength = s.length(); sIndex < sLength; sIndex++) { + char c = s.charAt(sIndex); + if (c < '\u0080') { + buf[i++] = (byte) c; + } else if (c < '\u0800') { + buf[i++] = (byte) (192 | c >>> 6); + buf[i++] = (byte) (128 | c & 63); + } else if (c < '\ud800' || c > '\udfff') { + buf[i++] = (byte) (224 | c >>> 12); + buf[i++] = (byte) (128 | c >>> 6 & 63); + buf[i++] = (byte) (128 | c & 63); + } else { + int cp = 0; + if (++sIndex < sLength) cp = Character.toCodePoint(c, s.charAt(sIndex)); + if ((cp >= 1 << 16) && (cp < 1 << 21)) { + buf[i++] = (byte) (240 | cp >>> 18); + buf[i++] = (byte) (128 | cp >>> 12 & 63); + buf[i++] = (byte) (128 | cp >>> 6 & 63); + buf[i++] = (byte) (128 | cp & 63); + } else + buf[i++] = (byte) '?'; + } + } + int size = i - start; + if (size > Context.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax)); + + int ii = start - 1; + if (size > 0x7f) { + i++; + for (int y = size; y >= 1 << 14; y >>>= 7) i++; + System.arraycopy(buf, start, buf, i - size, size); + + do { + buf[ii++] = (byte) (size | 0x80); + size >>>= 7; + } while (size > 0x7f); + } + buf[ii] = (byte) size; + } + } + + if (this.entries.length != 0) { + buf[i++] = (byte) 1; Entry[] a = this.entries; int x = a.length; @@ -246,6 +317,35 @@ public class Context implements Serializable { byte header = buf[i++]; if (header == (byte) 0) { + int length = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + length |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (length < 0 || length > Context.colferListMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", length, Context.colferListMax)); + + String[] a = new String[length]; + for (int ai = 0; ai < length; ai++) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Context.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax)); + + int start = i; + i += size; + a[ai] = new String(buf, start, size, StandardCharsets.UTF_8); + } + this.tags = a; + header = buf[i++]; + } + + if (header == (byte) 1) { int length = 0; for (int shift = 0; true; shift += 7) { byte b = buf[i++]; @@ -278,7 +378,7 @@ public class Context implements Serializable { } // {@link Serializable} version number. - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { @@ -311,6 +411,32 @@ public class Context implements Serializable { init(); } + /** + * Gets kamon/context/generated/binary/context.Context.tags. + * @return the value. + */ + public String[] getTags() { + return this.tags; + } + + /** + * Sets kamon/context/generated/binary/context.Context.tags. + * @param value the replacement. + */ + public void setTags(String[] value) { + this.tags = value; + } + + /** + * Sets kamon/context/generated/binary/context.Context.tags. + * @param value the replacement. + * @return {link this}. + */ + public Context withTags(String[] value) { + this.tags = value; + return this; + } + /** * Gets kamon/context/generated/binary/context.Context.entries. * @return the value. @@ -340,6 +466,7 @@ public class Context implements Serializable { @Override public final int hashCode() { int h = 1; + for (String o : this.tags) h = 31 * h + (o == null ? 0 : o.hashCode()); for (Entry o : this.entries) h = 31 * h + (o == null ? 0 : o.hashCode()); return h; } @@ -353,6 +480,7 @@ public class Context implements Serializable { if (o == null) return false; if (o == this) return true; return o.getClass() == Context.class + && java.util.Arrays.equals(this.tags, o.tags) && java.util.Arrays.equals(this.entries, o.entries); } diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 5984f3c4..a4a7871e 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -238,10 +238,27 @@ kamon { outgoing { } + } + } + } + + binary { + + # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on + # this section for HTTP context propagation. + # + default { + entries { + + # Specify mappings between Context keys and the Http.EntryReader implementation in charge of reading them + # from the incoming HTTP request into the Context. + incoming { + #span = "something" + } # Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them - # on the outgoing HTTP response sent back to clients. - returning { + # on the outgoing HTTP requests. + outgoing { } } diff --git a/kamon-core/src/main/scala/kamon/ContextPropagation.scala b/kamon-core/src/main/scala/kamon/ContextPropagation.scala index bfcfbf7e..8f2ed8e4 100644 --- a/kamon-core/src/main/scala/kamon/ContextPropagation.scala +++ b/kamon-core/src/main/scala/kamon/ContextPropagation.scala @@ -1,11 +1,14 @@ package kamon import com.typesafe.config.Config -import kamon.context.HttpPropagation +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} +import kamon.context.{BinaryPropagation, HttpPropagation, Propagation} trait ContextPropagation { self: Configuration with ClassLoading => @volatile private var _propagationComponents: ContextPropagation.Components = _ - @volatile private var _defaultHttpPropagation: HttpPropagation = _ + @volatile private var _defaultHttpPropagation: Propagation[HeaderReader, HeaderWriter] = _ + @volatile private var _defaultBinaryPropagation: Propagation[ByteStreamReader, ByteStreamWriter] = _ // Initial configuration and reconfigures init(self.config) @@ -14,28 +17,48 @@ trait ContextPropagation { self: Configuration with ClassLoading => /** * Retrieves the HTTP propagation channel with the supplied name. Propagation channels are configured on the - * kamon.propagation.channels configuration setting. + * kamon.propagation.http configuration section. * * @param channelName Channel name to retrieve. - * @return The HTTP propagation, if defined. + * @return The HTTP propagation, if available. */ - def httpPropagation(channelName: String): Option[HttpPropagation] = + def httpPropagation(channelName: String): Option[Propagation[HeaderReader, HeaderWriter]] = _propagationComponents.httpChannels.get(channelName) + /** + * Retrieves the binary propagation channel with the supplied name. Propagation channels are configured on the + * kamon.propagation.binary configuration section. + * + * @param channelName Channel name to retrieve. + * @return The binary propagation, if available. + */ + def binaryPropagation(channelName: String): Option[Propagation[ByteStreamReader, ByteStreamWriter]] = + _propagationComponents.binaryChannels.get(channelName) + /** * Retrieves the default HTTP propagation channel. Configuration for this channel can be found under the - * kamon.propagation.channels.http configuration setting. + * kamon.propagation.http.default configuration section. * * @return The default HTTP propagation. */ - def defaultHttpPropagation(): HttpPropagation = + def defaultHttpPropagation(): Propagation[HeaderReader, HeaderWriter] = _defaultHttpPropagation + /** + * Retrieves the default binary propagation channel. Configuration for this channel can be found under the + * kamon.propagation.binary.default configuration section. + * + * @return The default HTTP propagation. + */ + def defaultBinaryPropagation(): Propagation[ByteStreamReader, ByteStreamWriter] = + _defaultBinaryPropagation + private def init(config: Config): Unit = synchronized { _propagationComponents = ContextPropagation.Components.from(self.config, self) _defaultHttpPropagation = _propagationComponents.httpChannels(ContextPropagation.DefaultHttpChannel) + _defaultBinaryPropagation = _propagationComponents.binaryChannels(ContextPropagation.DefaultBinaryChannel) } } @@ -44,7 +67,8 @@ object ContextPropagation { val DefaultBinaryChannel = "default" case class Components( - httpChannels: Map[String, HttpPropagation] + httpChannels: Map[String, Propagation[HeaderReader, HeaderWriter]], + binaryChannels: Map[String, Propagation[ByteStreamReader, ByteStreamWriter]] ) object Components { @@ -52,11 +76,17 @@ object ContextPropagation { def from(config: Config, classLoading: ClassLoading): Components = { val propagationConfig = config.getConfig("kamon.propagation") val httpChannelsConfig = propagationConfig.getConfig("http").configurations + val binaryChannelsConfig = propagationConfig.getConfig("binary").configurations + val httpChannels = httpChannelsConfig.map { case (channelName, channelConfig) => (channelName -> HttpPropagation.from(channelConfig, classLoading)) } - Components(httpChannels) + val binaryChannels = binaryChannelsConfig.map { + case (channelName, channelConfig) => (channelName -> BinaryPropagation.from(channelConfig, classLoading)) + } + + Components(httpChannels, binaryChannels) } } } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 2825d961..a6cc7bf4 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -19,7 +19,6 @@ import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} import com.typesafe.config.{Config, ConfigFactory} -import kamon.context.Codecs import kamon.metric._ import kamon.trace._ import kamon.util.{Clock, Filters, Matcher, Registration} @@ -41,7 +40,6 @@ object Kamon extends MetricLookup with ClassLoading with Configuration with Repo private val _metrics = new MetricRegistry(_config, _scheduler) private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock) private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock) - private val _contextCodec = new Codecs(_config) //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] sys.addShutdownHook(() => _scheduler.shutdown()) @@ -56,7 +54,6 @@ object Kamon extends MetricLookup with ClassLoading with Configuration with Repo _metrics.reconfigure(config) _reporterRegistry.reconfigure(config) _tracer.reconfigure(config) - _contextCodec.reconfigure(config) _scheduler match { case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config)) @@ -91,11 +88,6 @@ object Kamon extends MetricLookup with ClassLoading with Configuration with Repo override def identityProvider: IdentityProvider = _tracer.identityProvider - def contextCodec(): Codecs = - _contextCodec - - - override def loadReportersFromConfig(): Unit = _reporterRegistry.loadReportersFromConfig() diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala new file mode 100644 index 00000000..847ac452 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala @@ -0,0 +1,277 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} + +import com.typesafe.config.Config +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag +import scala.util.control.NonFatal +import scala.util.{Failure, Success} + + +/** + * Context propagation that uses byte stream abstractions as the transport medium. The Binary propagation uses + * instances of [[ByteStreamReader]] and [[ByteStreamWriter]] to decode and encode Context instances, respectively. + * + * Binary propagation uses the [[ByteStreamReader]] and [[ByteStreamWriter]] abstraction which closely model the APIs + * from [[InputStream]] and [[OutputStream]], but without exposing additional functionality that wouldn't have any + * well defined behavior for Context propagation, e.g. flush or close functions on OutputStreams. + */ +object BinaryPropagation { + + /** + * Represents a readable stream of bytes. This interface closely resembles [[InputStream]], minus the functionality + * that wouldn't have a clearly defined behavior in the context of Context propagation. + */ + trait ByteStreamReader { + /** + * Number of available bytes on the ByteStream. + */ + def available(): Int + + /** + * Reads as many bytes as possible into the target byte array. + * + * @param target Target buffer in which the read bytes will be written. + * @return The number of bytes written into the target buffer. + */ + def read(target: Array[Byte]): Int + + /** + * Reads a specified number of bytes into the target buffer, starting from the offset position. + * + * @param target Target buffer in which read bytes will be written. + * @param offset Offset index in which to start writing bytes on the target buffer. + * @param count Number of bytes to be read. + * @return The number of bytes written into the target buffer. + */ + def read(target: Array[Byte], offset: Int, count: Int): Int + + /** + * Reads all available bytes into a newly created byte array. + * + * @return All bytes read. + */ + def readAll(): Array[Byte] + } + + + object ByteStreamReader { + + /** + * Creates a new [[ByteStreamReader]] that reads data from a byte array. + */ + def of(bytes: Array[Byte]): ByteStreamReader = new ByteArrayInputStream(bytes) with ByteStreamReader { + override def readAll(): Array[Byte] = { + val target = Array.ofDim[Byte](available()) + read(target, 0, available()) + target + } + } + } + + + /** + * Represents a writable stream of bytes. This interface closely resembles [[OutputStream]], minus the functionality + * that wouldn't have a clearly defined behavior in the context of Context propagation. + */ + trait ByteStreamWriter { + + /** + * Writes all bytes into the stream. + */ + def write(bytes: Array[Byte]): Unit + + /** + * Writes a portion of the provided bytes into the stream. + * + * @param bytes Buffer from which data will be selected. + * @param offset Starting index on the buffer. + * @param count Number of bytes to write into the stream. + */ + def write(bytes: Array[Byte], offset: Int, count: Int): Unit + + /** + * Write a single byte into the stream. + */ + def write(byte: Int): Unit + + } + + object ByteStreamWriter { + + /** + * Creates a new [[ByteStreamWriter]] from an OutputStream. + */ + def of(outputStream: OutputStream): ByteStreamWriter = new ByteStreamWriter { + override def write(bytes: Array[Byte]): Unit = + outputStream.write(bytes) + + override def write(bytes: Array[Byte], offset: Int, count: Int): Unit = + outputStream.write(bytes, offset, count) + + override def write(byte: Int): Unit = + outputStream.write(byte) + } + } + + + + /** + * Create a new default Binary Propagation instance from the provided configuration. + * + * @param config Binary Propagation channel configuration + * @return A newly constructed HttpPropagation instance. + */ + def from(config: Config, classLoading: ClassLoading): Propagation[ByteStreamReader, ByteStreamWriter] = { + new BinaryPropagation.Default(Settings.from(config, classLoading)) + } + + /** + * Default Binary propagation in Kamon. This implementation uses Colfer to read and write the context tags and + * entries. Entries are represented as simple pairs of entry name and bytes, which are then processed by the all + * configured entry readers and writers. + */ + class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] { + private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default]) + private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] { + override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128) + } + + override def read(reader: ByteStreamReader): Context = { + if(reader.available() > 0) { + val contextData = new ColferContext() + contextData.unmarshal(reader.readAll(), 0) + + // Context tags + var tagSectionsCount = contextData.tags.length + if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { + _log.warn("Malformed context tags found when trying to read a Context from ByteStreamReader") + tagSectionsCount -= 1 + } + + val tags = if (tagSectionsCount > 0) { + val tagsBuilder = Map.newBuilder[String, String] + var tagIndex = 0 + while (tagIndex < tagSectionsCount) { + tagsBuilder += (contextData.tags(tagIndex) -> contextData.tags(tagIndex + 1)) + tagIndex += 2 + } + tagsBuilder.result() + + } else Map.empty[String, String] + + + // Only reads the entries for which there is a registered reader + contextData.entries.foldLeft(Context.of(tags)) { + case (context, entryData) => + settings.incomingEntries.get(entryData.name).map { entryReader => + var contextWithEntry = context + try { + contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) + } catch { + case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) + } + + contextWithEntry + }.getOrElse(context) + } + } else Context.Empty + } + + override def write(context: Context, writer: ByteStreamWriter): Unit = { + if (context.nonEmpty()) { + val contextData = new ColferContext() + val output = _streamPool.get() + + if (context.tags.nonEmpty) { + val tags = Array.ofDim[String](context.tags.size * 2) + var tagIndex = 0 + context.tags.foreach { + case (key, value) => + tags.update(tagIndex, key) + tags.update(tagIndex + 1, value) + tagIndex += 2 + } + + contextData.tags = tags + } + + if (context.entries.nonEmpty) { + val entries = settings.outgoingEntries.collect { + case (entryName, entryWriter) if context.entries.contains(entryName) => + output.reset() + entryWriter.write(context, output) + + val colferEntry = new ColferEntry() + colferEntry.name = entryName + colferEntry.content = output.toByteArray() + colferEntry + } + + contextData.entries = entries.toArray + } + + output.reset() + // TODO: avoid internal allocation of byte[] on the marshal method. Use ReusableByteStreamWriter's underlying buffer. + contextData.marshal(output, null) + writer.write(output.toByteArray) + } + } + } + + object Default { + private class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter { + def underlying(): Array[Byte] = this.buf + } + } + + case class Settings( + incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]], + outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]] + ) + + object Settings { + private val log = LoggerFactory.getLogger(classOf[BinaryPropagation.Settings]) + + def from(config: Config, classLoading: ClassLoading): BinaryPropagation.Settings = { + def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { + val instanceMap = Map.newBuilder[String, ExpectedType] + + mappings.foreach { + case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match { + case Success(componentInstance) => instanceMap += (contextKey -> componentInstance) + case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception) + } + } + + instanceMap.result() + } + + Settings( + buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs), + buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs) + ) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala deleted file mode 100644 index 465f53be..00000000 --- a/kamon-core/src/main/scala/kamon/context/Codecs.scala +++ /dev/null @@ -1,297 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon -package context - -import java.nio.ByteBuffer - -import com.typesafe.config.Config -import kamon.util.DynamicAccess -import org.slf4j.LoggerFactory -import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} - -import scala.collection.mutable - -class Codecs(initialConfig: Config) { - private val log = LoggerFactory.getLogger(classOf[Codecs]) - @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty) - @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty) - - reconfigure(initialConfig) - - - def HttpHeaders: Codecs.ForContext[TextMap] = - httpHeaders - - def Binary: Codecs.ForContext[ByteBuffer] = - binary - - def reconfigure(config: Config): Unit = { - import scala.collection.JavaConverters._ - try { - val codecsConfig = config.getConfig("kamon.context.codecs") - val stringKeys = readStringKeysConfig(codecsConfig.getConfig("string-keys")) - val knownHttpHeaderCodecs = readEntryCodecs[TextMap]("http-headers-keys", codecsConfig) ++ stringHeaderCodecs(stringKeys) - val knownBinaryCodecs = readEntryCodecs[ByteBuffer]("binary-keys", codecsConfig) ++ stringBinaryCodecs(stringKeys) - - httpHeaders = new Codecs.HttpHeaders(knownHttpHeaderCodecs) - binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), knownBinaryCodecs) - } catch { - case t: Throwable => log.error("Failed to initialize Context Codecs", t) - } - } - - private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = { - val rootConfig = config.getConfig(rootKey) - val dynamic = new DynamicAccess(getClass.getClassLoader) - val entries = Map.newBuilder[String, Codecs.ForEntry[T]] - - rootConfig.topLevelKeys.foreach(key => { - try { - val fqcn = rootConfig.getString(key) - entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get)) - } catch { - case e: Throwable => - log.error(s"Failed to initialize codec for key [$key]", e) - } - }) - - entries.result() - } - - private def readStringKeysConfig(config: Config): Map[String, String] = - config.topLevelKeys.map(key => (key, config.getString(key))).toMap - - private def stringHeaderCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[TextMap]] = - keys.map { case (key, header) => (key, new Codecs.StringHeadersCodec(key, header)) } - - private def stringBinaryCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[ByteBuffer]] = - keys.map { case (key, _) => (key, new Codecs.StringBinaryCodec(key)) } -} - -object Codecs { - - trait ForContext[T] { - def encode(context: Context): T - def decode(carrier: T): Context - } - - trait ForEntry[T] { - def encode(context: Context): T - def decode(carrier: T, context: Context): Context - } - - final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] { - private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) - - override def encode(context: Context): TextMap = { - val encoded = TextMap.Default() - -// context.entries.foreach { -// case (key, _) if key.broadcast => -// entryCodecs.get(key.name) match { -// case Some(codec) => -// try { -// codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) -// } catch { -// case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) -// } -// -// case None => -// log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) -// } -// -// case _ => // All non-broadcast keys should be ignored. -// } - - encoded - } - - override def decode(carrier: TextMap): Context = { - var context: Context = Context.Empty - - try { - context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { - val (_, codec) = codecEntry - codec.decode(carrier, ctx) - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from HttpHeaders", e) - } - - context - } - } - - - final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] { - private val log = LoggerFactory.getLogger(classOf[Binary]) - private val binaryBuffer = newThreadLocalBuffer(bufferSize) - private val emptyBuffer = ByteBuffer.allocate(0) - - override def encode(context: Context): ByteBuffer = { - val entries = context.entries - if(entries.isEmpty) - emptyBuffer - else { - var colferEntries: List[ColferEntry] = Nil -// entries.foreach { -// case (key, _) if key.broadcast => -// entryCodecs.get(key.name) match { -// case Some(entryCodec) => -// try { -// val entryData = entryCodec.encode(context) -// if(entryData.capacity() > 0) { -// val colferEntry = new ColferEntry() -// colferEntry.setName(key.name) -// colferEntry.setContent(entryData.array()) -// colferEntries = colferEntry :: colferEntries -// } -// } catch { -// case throwable: Throwable => -// log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) -// } -// -// case None => -// log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) -// } -// -// case _ => // All non-broadcast keys should be ignored. -// } - - if(colferEntries.isEmpty) - emptyBuffer - else { - val buffer = binaryBuffer.get() - val colferContext = new ColferContext() - colferContext.setEntries(colferEntries.toArray) - val marshalledSize = colferContext.marshal(buffer, 0) - - val data = Array.ofDim[Byte](marshalledSize) - System.arraycopy(buffer, 0, data, 0, marshalledSize) - ByteBuffer.wrap(data) - } - } - } - - - override def decode(carrier: ByteBuffer): Context = { - if(carrier.capacity() == 0) - Context.Empty - else { - var context: Context = Context.Empty - - try { - val colferContext = new ColferContext() - colferContext.unmarshal(carrier.array(), 0) - - colferContext.entries.foreach(colferEntry => { - entryCodecs.get(colferEntry.getName()) match { - case Some(entryCodec) => - context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context) - - case None => - log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName()) - } - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from Binary", e) - } - - context - } - } - - - private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] { - override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt) - } - } - - private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] { - //private val contextKey = Key.broadcast[Option[String]](key, None) - - override def encode(context: Context): TextMap = { - val textMap = TextMap.Default() -// context.get(contextKey).foreach { value => -// textMap.put(headerName, value) -// } - - textMap - } - - override def decode(carrier: TextMap, context: Context): Context = { - ??? -// carrier.get(headerName) match { -// case value @ Some(_) => context.withKey(contextKey, value) -// case None => context -// } - } - } - - private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] { - val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0) - //private val contextKey = Key.broadcast[Option[String]](key, None) - - override def encode(context: Context): ByteBuffer = { -// context.get(contextKey) match { -// case Some(value) => ByteBuffer.wrap(value.getBytes) -// case None => emptyBuffer -// } - ??? - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - ??? //context.withKey(contextKey, Some(new String(carrier.array()))) - } - } -} - - -trait TextMap { - - def get(key: String): Option[String] - - def put(key: String, value: String): Unit - - def values: Iterator[(String, String)] -} - -object TextMap { - - class Default extends TextMap { - private val storage = - mutable.Map.empty[String, String] - - override def get(key: String): Option[String] = - storage.get(key) - - override def put(key: String, value: String): Unit = - storage.put(key, value) - - override def values: Iterator[(String, String)] = - storage.toIterator - } - - object Default { - def apply(): Default = - new Default() - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index 4d3501db..2a7a382e 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -1,5 +1,5 @@ /* ========================================================================================= - * Copyright © 2013-2017 the kamon project + * Copyright © 2013-2018 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -19,16 +19,16 @@ package context import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ -class Context private (private[context] val entries: Map[Context.Key[_], Any], val tags: Map[String, String]) { +class Context private (val entries: Map[String, Any], val tags: Map[String, String]) { def get[T](key: Context.Key[T]): T = - entries.getOrElse(key, key.emptyValue).asInstanceOf[T] + entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T] def getTag(tagKey: String): Option[String] = tags.get(tagKey) def withKey[T](key: Context.Key[T], value: T): Context = - new Context(entries.updated(key, value), tags) + new Context(entries.updated(key.name, value), tags) def withTag(tagKey: String, tagValue: String): Context = new Context(entries, tags.updated(tagKey, tagValue)) @@ -39,6 +39,11 @@ class Context private (private[context] val entries: Map[Context.Key[_], Any], v def withTags(tags: JavaMap[String, String]): Context = new Context(entries, this.tags ++ tags.asScala.toMap) + def isEmpty(): Boolean = + entries.isEmpty && tags.isEmpty + + def nonEmpty(): Boolean = + !isEmpty() } @@ -53,22 +58,22 @@ object Context { new Context(Map.empty, tags) def of[T](key: Context.Key[T], value: T): Context = - new Context(Map(key -> value), Map.empty) + new Context(Map(key.name -> value), Map.empty) def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context = - new Context(Map(key -> value), tags.asScala.toMap) + new Context(Map(key.name -> value), tags.asScala.toMap) def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context = - new Context(Map(key -> value), tags) + new Context(Map(key.name -> value), tags) def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context = - new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), Map.empty) + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty) def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context = - new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags.asScala.toMap) + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap) def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context = - new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags) + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags) def key[T](name: String, emptyValue: T): Context.Key[T] = new Context.Key(name, emptyValue) diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala index d53a6250..6a15e2a6 100644 --- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -1,3 +1,18 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon package context @@ -9,89 +24,31 @@ import scala.util.control.NonFatal import scala.util.{Failure, Success} /** - * Context Propagation for HTTP transports. When using HTTP transports all the context related information is - * read from and written to HTTP headers. The context information may be included in the following directions: - * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read. - * - Outgoing: Used for HTTP requests leaving this service. - * - Returning: Used for HTTP responses send back to clients of this service. + * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of + * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on + * outgoing requests to transfer a context to remote services. */ -trait HttpPropagation { - - /** - * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a - * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is - * implementation specific. - * - * @param reader Wrapper on the HTTP message from which headers are read. - * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an - * empty context is returned instead. - */ - def readContext(reader: HttpPropagation.HeaderReader): Context - - /** - * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]] - * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation - * specific. - * - * Implementations are expected to produce side effects on the wrapped HTTP Messages. - * - * @param context Context instance to be written. - * @param writer Wrapper on the HTTP message that will carry the context headers. - * @param direction Write direction. It can be either Outgoing or Returning. - */ - def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit - -} - object HttpPropagation { /** - * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait - * must be aware of the entry they are able to read and the HTTP headers required to do so. + * Wrapper that reads HTTP headers from a HTTP message. */ - trait EntryReader { - - /** - * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations - * must return an updated context instance that includes such entry. If no entry could be read simply return - * context instance that was passed in, untouched. - * - * @param reader Wrapper on the HTTP message from which headers are read. - * @param context Current context. - * @return Either the original context passed in or a modified version of it, including the read entry. - */ - def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context - } - - /** - * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait - * must be aware of the entry they are able to write and the HTTP headers required to do so. - */ - trait EntryWriter { + trait HeaderReader { /** - * Tries to write a context entry into HTTP headers. + * Reads a single HTTP header value. * - * @param context The context from which entries should be written. - * @param writer Wrapper on the HTTP message that will carry the context headers. - * @param direction Write direction. It can be either Outgoing or Returning. + * @param header HTTP header name + * @return The HTTP header value, if present. */ - def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit - } - - - /** - * Wrapper that reads HTTP headers from HTTP a message. - */ - trait HeaderReader { + def read(header: String): Option[String] /** - * Reads an HTTP header value + * Reads all HTTP headers present in the wrapped HTTP message. * - * @param header HTTP header name - * @return The HTTP header value, if present. + * @return A map from header name to */ - def readHeader(header: String): Option[String] + def readAll(): Map[String, String] } /** @@ -105,39 +62,40 @@ object HttpPropagation { * @param header HTTP header name. * @param value HTTP header value. */ - def writeHeader(header: String, value: String): Unit + def write(header: String, value: String): Unit } + /** - * Create a new default HttpPropagation instance from the provided configuration. + * Create a new default HTTP propagation instance from the provided configuration. * * @param config HTTP propagation channel configuration * @return A newly constructed HttpPropagation instance. */ - def from(config: Config, classLoading: ClassLoading): HttpPropagation = { - new HttpPropagation.Default(Components.from(config, classLoading)) + def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = { + new HttpPropagation.Default(Settings.from(config, classLoading)) } /** * Default HTTP Propagation in Kamon. */ - final class Default(components: Components) extends HttpPropagation { + class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] { private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default]) /** * Reads context tags and entries on the following order: - * - Read all context tags from the context tags header. - * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case - * of a tag key clash. - * - Read all context entries using the incoming entries configuration. + * 1. Read all context tags from the context tags header. + * 2. Read all context tags with explicit mappings. This overrides any tag + * from the previous step in case of a tag key clash. + * 3. Read all context entries using the incoming entries configuration. */ - override def readContext(reader: HeaderReader): Context = { + override def read(reader: HeaderReader): Context = { val tags = Map.newBuilder[String, String] // Tags encoded together in the context tags header. try { - reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader => + reader.read(settings.tagsHeaderName).foreach { contextTagsHeader => contextTagsHeader.split(";").foreach(tagData => { val tagPair = tagData.split("=") if (tagPair.length == 2) { @@ -150,21 +108,21 @@ object HttpPropagation { } // Tags explicitly mapped on the tags.mappings configuration. - components.tagsMappings.foreach { + settings.tagsMappings.foreach { case (tagName, httpHeader) => try { - reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) } catch { case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) } } // Incoming Entries - components.incomingEntries.foldLeft(Context.of(tags.result())) { + settings.incomingEntries.foldLeft(Context.of(tags.result())) { case (context, (entryName, entryDecoder)) => var result = context try { - result = entryDecoder.readEntry(reader, context) + result = entryDecoder.read(reader, context) } catch { case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } @@ -176,12 +134,7 @@ object HttpPropagation { /** * Writes context tags and entries */ - override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { - val keys = direction match { - case Direction.Outgoing => components.outgoingEntries - case Direction.Returning => components.returningEntries - } - + override def write(context: Context, writer: HeaderWriter): Unit = { val contextTagsHeader = new StringBuilder() def appendTag(key: String, value: String): Unit = { contextTagsHeader @@ -193,22 +146,22 @@ object HttpPropagation { // Write tags with specific mappings or append them to the context tags header. context.tags.foreach { - case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match { - case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue) + case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match { + case Some(mappedHeader) => writer.write(mappedHeader, tagValue) case None => appendTag(tagKey, tagValue) } } // Write the context tags header. if(contextTagsHeader.nonEmpty) { - writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result()) + writer.write(settings.tagsHeaderName, contextTagsHeader.result()) } // Write entries for the specified direction. - keys.foreach { + settings.outgoingEntries.foreach { case (entryName, entryWriter) => try { - entryWriter.writeEntry(context, writer, direction) + entryWriter.write(context, writer) } catch { case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } @@ -216,68 +169,37 @@ object HttpPropagation { } } - /** - * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to - * propagate context. - */ - sealed trait Direction - object Direction { - - /** - * Marker trait for all directions that require write operations. - */ - sealed trait Write - - /** - * Requests coming into this service. - */ - case object Incoming extends Direction - - /** - * Requests going from this service to others. - */ - case object Outgoing extends Direction with Write - - /** - * Responses sent from this service to clients. - */ - case object Returning extends Direction with Write - } - - - case class Components( + case class Settings( tagsHeaderName: String, tagsMappings: Map[String, String], - incomingEntries: Map[String, HttpPropagation.EntryReader], - outgoingEntries: Map[String, HttpPropagation.EntryWriter], - returningEntries: Map[String, HttpPropagation.EntryWriter] + incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]], + outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]] ) - object Components { - private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components]) + object Settings { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings]) - def from(config: Config, classLoading: ClassLoading): Components = { + def from(config: Config, classLoading: ClassLoading): Settings = { def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { - val entryReaders = Map.newBuilder[String, ExpectedType] + val instanceMap = Map.newBuilder[String, ExpectedType] mappings.foreach { - case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match { - case Success(readerInstance) => entryReaders += (contextKey -> readerInstance) + case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match { + case Success(componentInstance) => instanceMap += (contextKey -> componentInstance) case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", - implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception) + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception) } } - entryReaders.result() + instanceMap.result() } val tagsHeaderName = config.getString("tags.header-name") val tagsMappings = config.getConfig("tags.mappings").pairs - val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs) - val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs) - val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs) + val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs) + val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs) - Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries) + Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries) } } diff --git a/kamon-core/src/main/scala/kamon/context/Propagation.scala b/kamon-core/src/main/scala/kamon/context/Propagation.scala new file mode 100644 index 00000000..1d973ca9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Propagation.scala @@ -0,0 +1,81 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.context + +/** + * Inter-process Context propagation interface. Implementations are responsible from reading and writing a lossless + * representation of a given Context instance to the appropriate mediums. Out of the box, Kamon ships with two + * implementations: + * + * * HttpPropagation: Uses HTTP headers as the medium to transport the Context data. + * * BinaryPropagation: Uses byte streams as the medium to transport the Context data. + * + */ +trait Propagation[ReaderMedium, WriterMedium] { + + /** + * Attempts to read a Context from the [[ReaderMedium]]. + * + * @param medium An abstraction the reads data from the medium transporting the Context data. + * @return The decoded Context instance or an empty Context if no entries or tags could be read from the medium. + */ + def read(medium: ReaderMedium): Context + + /** + * Attempts to write a Context instance to the [[WriterMedium]]. + * + * @param context Context instance to be written. + * @param medium An abstraction that writes data into the medium that will transport the Context data. + */ + def write(context: Context, medium: WriterMedium): Unit + +} + +object Propagation { + + /** + * Encapsulates logic required to read a single context entry from a medium. Implementations of this trait + * must be aware of the entry they are able to read. + */ + trait EntryReader[Medium] { + + /** + * Tries to read a context entry from the medium. If a context entry is successfully read, implementations + * must return an updated context instance that includes such entry. If no entry could be read simply return the + * context instance that was passed in, unchanged. + * + * @param medium An abstraction the reads data from the medium transporting the Context data. + * @param context Current context. + * @return Either the original context passed in or a modified version of it, including the read entry. + */ + def read(medium: Medium, context: Context): Context + } + + /** + * Encapsulates logic required to write a single context entry to the medium. Implementations of this trait + * must be aware of the entry they are able to write. + */ + trait EntryWriter[Medium] { + + /** + * Tries to write a context entry into the medium. + * + * @param context The context from which entries should be written. + * @param medium An abstraction that writes data into the medium that will transport the Context data. + */ + def write(context: Context, medium: Medium): Unit + } +} diff --git a/kamon-core/src/main/scala/kamon/context/Storage.scala b/kamon-core/src/main/scala/kamon/context/Storage.scala index 4f4e6cbb..7bbca111 100644 --- a/kamon-core/src/main/scala/kamon/context/Storage.scala +++ b/kamon-core/src/main/scala/kamon/context/Storage.scala @@ -1,5 +1,5 @@ /* ========================================================================================= - * Copyright © 2013-2017 the kamon project + * Copyright © 2013-2018 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala index 3cb45619..02e92f25 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -5,7 +5,6 @@ import java.time.Duration import com.typesafe.config.Config import kamon.context.Context -import kamon.context.HttpPropagation.Direction import kamon.instrumentation.HttpServer.Settings.TagMode import kamon.metric.MeasurementUnit.{time, information} import kamon.trace.{IdentityProvider, Span} @@ -259,7 +258,7 @@ object HttpServer { override def receive(request: HttpRequest): RequestHandler = { val incomingContext = if(settings.enableContextPropagation) - _propagation.readContext(request) + _propagation.read(request) else Context.Empty val requestSpan = if(settings.enableTracing) @@ -296,7 +295,7 @@ object HttpServer { } if(settings.enableContextPropagation) { - _propagation.writeContext(context, response, Direction.Returning) + _propagation.write(context, response) } _metrics.foreach { httpServerMetrics => diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 7439520c..63f8e1b0 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -19,35 +19,36 @@ import java.net.{URLDecoder, URLEncoder} import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codecs, Context, HttpPropagation, TextMap} +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context._ import kamon.context.generated.binary.span.{Span => ColferSpan} -import kamon.context.HttpPropagation.Direction +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} import kamon.trace.SpanContext.SamplingDecision object SpanCodec { - class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { + class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { import B3.Headers - override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = reader.readHeader(Headers.TraceIdentifier) + val traceID = reader.read(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = reader.readHeader(Headers.SpanIdentifier) + val spanID = reader.read(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = reader.readHeader(Headers.ParentSpanIdentifier) + val parentID = reader.read(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = reader.readHeader(Headers.Flags) + val flags = reader.read(Headers.Flags) - val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -59,19 +60,19 @@ object SpanCodec { } - override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { val span = context.get(Span.ContextKey) if(span.nonEmpty()) { val spanContext = span.context() - writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) if(spanContext.parentID != IdentityProvider.NoIdentifier) - writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - writer.writeHeader(Headers.Sampled, samplingDecision) + writer.write(Headers.Sampled, samplingDecision) } } } @@ -101,38 +102,16 @@ object SpanCodec { } - class Colfer extends Codecs.ForEntry[ByteBuffer] { + class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { val emptyBuffer = ByteBuffer.allocate(0) - override def encode(context: Context): ByteBuffer = { - val span = context.get(Span.ContextKey) - if(span.nonEmpty()) { - val marshalBuffer = Colfer.codecBuffer.get() - val colferSpan = new ColferSpan() - val spanContext = span.context() - - colferSpan.setTraceID(spanContext.traceID.bytes) - colferSpan.setSpanID(spanContext.spanID.bytes) - colferSpan.setParentID(spanContext.parentID.bytes) - colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) - - val marshalledSize = colferSpan.marshal(marshalBuffer, 0) - val buffer = ByteBuffer.allocate(marshalledSize) - buffer.put(marshalBuffer, 0, marshalledSize) - buffer - - } else emptyBuffer - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - carrier.clear() - - if(carrier.capacity() == 0) + override def read(medium: ByteStreamReader, context: Context): Context = { + if(medium.available() == 0) context else { val identityProvider = Kamon.tracer.identityProvider val colferSpan = new ColferSpan() - colferSpan.unmarshal(carrier.array(), 0) + colferSpan.unmarshal(medium.readAll(), 0) val spanContext = SpanContext( traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), @@ -145,6 +124,24 @@ object SpanCodec { } } + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + medium.write(marshalBuffer, 0, marshalledSize) + + } + } private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { case SamplingDecision.Sample => 1 diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala b/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala deleted file mode 100644 index c755b0af..00000000 --- a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.testkit - -import java.nio.ByteBuffer - -import kamon.context.{Codecs, Context, TextMap} - -object SimpleStringCodec { - final class Headers extends Codecs.ForEntry[TextMap] { - private val dataKey = "X-String-Value" - - override def encode(context: Context): TextMap = { - val textMap = TextMap.Default() - context.get(ContextTesting.StringBroadcastKey).foreach { value => - textMap.put(dataKey, value) - } - - textMap - } - - override def decode(carrier: TextMap, context: Context): Context = { - carrier.get(dataKey) match { - case value @ Some(_) => context.withKey(ContextTesting.StringBroadcastKey, value) - case None => context - } - } - } - - final class Binary extends Codecs.ForEntry[ByteBuffer] { - val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0) - - override def encode(context: Context): ByteBuffer = { - context.get(ContextTesting.StringBroadcastKey) match { - case Some(value) => ByteBuffer.wrap(value.getBytes) - case None => emptyBuffer - } - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - context.withKey(ContextTesting.StringBroadcastKey, Some(new String(carrier.array()))) - } - } -} -- cgit v1.2.3