aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-09-28 12:08:24 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2018-09-28 12:08:24 +0200
commitb61c92ea3589450fd097ab79420230b61b458ae4 (patch)
tree1db5629ec257e41e3fff0bbd50eabdc45bb05d7d
parent6941f53c79bb4ad8adb5ebc4bcc3a3c44d02e353 (diff)
downloadKamon-b61c92ea3589450fd097ab79420230b61b458ae4.tar.gz
Kamon-b61c92ea3589450fd097ab79420230b61b458ae4.tar.bz2
Kamon-b61c92ea3589450fd097ab79420230b61b458ae4.zip
cleanup HTTP propagation, introduce a new Binary propagation
-rw-r--r--kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala2
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala98
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala93
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala50
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala53
-rw-r--r--kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala5
-rw-r--r--kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala2
-rw-r--r--kamon-core/src/main/colfer/Context.colf1
-rw-r--r--kamon-core/src/main/colfer/building.md2
-rw-r--r--kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java132
-rw-r--r--kamon-core/src/main/resources/reference.conf21
-rw-r--r--kamon-core/src/main/scala/kamon/ContextPropagation.scala48
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala277
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codecs.scala297
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala25
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala204
-rw-r--r--kamon-core/src/main/scala/kamon/context/Propagation.scala81
-rw-r--r--kamon-core/src/main/scala/kamon/context/Storage.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala5
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala77
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala57
22 files changed, 817 insertions, 723 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
index b5d0425b..ee80fd2b 100644
--- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala
@@ -12,7 +12,7 @@ import org.scalatest.time.SpanSugar._
class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{
- "the Kamon lifecycle" should {
+ "the Kamon lifecycle" ignore {
"keep the JVM running if reporters are running" in {
val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter"))
Thread.sleep(5000)
diff --git a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
new file mode 100644
index 00000000..99e72f59
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
@@ -0,0 +1,98 @@
+package kamon.context
+
+import java.io.ByteArrayOutputStream
+
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context.Propagation.{EntryReader, EntryWriter}
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+
+class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {
+
+ "The Binary Context Propagation" should {
+ "return an empty context if there is no data to read from" in {
+ val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0)))
+ context.isEmpty() shouldBe true
+ }
+
+ "not write any data to the medium if the context is empty" in {
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(Context.Empty, writer)
+ writer.size() shouldBe 0
+ }
+
+ "round trip a Context that only has tags" in {
+ val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.entries shouldBe empty
+ rtContext.tags should contain theSameElementsAs (context.tags)
+ }
+
+ "round trip a Context that only has entries" in {
+ val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42)
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.tags shouldBe empty
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
+ }
+
+ "round trip a Context that with tags and entries" in {
+ val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
+ .withKey(BinaryPropagationSpec.StringKey, "string-value")
+ .withKey(BinaryPropagationSpec.IntegerKey, 42)
+
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+
+ rtContext.tags should contain theSameElementsAs (context.tags)
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
+ }
+ }
+
+ val binaryPropagation = BinaryPropagation.from(
+ ConfigFactory.parseString(
+ """
+ |
+ |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
+ |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
+ |
+ """.stripMargin
+ ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon)
+
+
+ def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter
+
+}
+
+object BinaryPropagationSpec {
+
+ val StringKey = Context.key[String]("string", null)
+ val IntegerKey = Context.key[Int]("integer", 0)
+
+ class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {
+
+ override def read(medium: ByteStreamReader, context: Context): Context = {
+ val valueData = medium.readAll()
+
+ if(valueData.length > 0) {
+ context.withKey(StringKey, new String(valueData))
+ } else context
+ }
+
+ override def write(context: Context, medium: ByteStreamWriter): Unit = {
+ val value = context.get(StringKey)
+ if(value != null) {
+ medium.write(value.getBytes)
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala
deleted file mode 100644
index 6a43bd01..00000000
--- a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.context
-
-import kamon.Kamon
-import kamon.testkit.ContextTesting
-import org.scalatest.{Matchers, OptionValues, WordSpec}
-
-class ContextCodecSpec extends WordSpec with Matchers with ContextTesting with OptionValues {
- "the Context Codec" ignore {
- "encoding/decoding to HttpHeaders" should {
- "round trip a empty context" in {
- val textMap = ContextCodec.HttpHeaders.encode(Context.Empty)
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with only local keys" in {
- val localOnlyContext = Context.of(StringKey, Some("string-value"))
- val textMap = ContextCodec.HttpHeaders.encode(localOnlyContext)
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with local and broadcast keys" in {
- val initialContext = Context.Empty
- .withKey(StringKey, Some("string-value"))
- .withKey(StringBroadcastKey, Some("this-should-be-round-tripped"))
-
- val textMap = ContextCodec.HttpHeaders.encode(initialContext)
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- decodedContext.get(StringKey) shouldBe empty
- decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped"
- }
-
- "read string broadcast keys using the configured header name" in {
- val textMap = TextMap.Default()
- textMap.put("X-Request-ID", "123456")
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- //decodedContext.get(Key.broadcastString("request-id")).value shouldBe "123456"
- }
- }
-
- "encoding/decoding to Binary" should {
- "round trip a empty context" in {
- val byteBuffer = ContextCodec.Binary.encode(Context.Empty)
-
- val decodedContext = ContextCodec.Binary.decode(byteBuffer)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with only local keys" in {
- val localOnlyContext = Context.of(StringKey, Some("string-value"))
- val byteBuffer = ContextCodec.Binary.encode(localOnlyContext)
- val decodedContext = ContextCodec.Binary.decode(byteBuffer)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with local and broadcast keys" in {
- val initialContext = Context.Empty
- .withKey(StringKey, Some("string-value"))
- .withKey(StringBroadcastKey, Some("this-should-be-round-tripped"))
-
- val byteBuffer = ContextCodec.Binary.encode(initialContext)
- val decodedContext = ContextCodec.Binary.decode(byteBuffer)
-
- decodedContext.get(StringKey) shouldBe empty
- decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped"
- }
- }
- }
-
- val ContextCodec = new Codecs(Kamon.config())
-} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala
index 22400fa9..7ffb0838 100644
--- a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala
@@ -22,29 +22,29 @@ import kamon.testkit.ContextTesting
import org.scalatest.{Matchers, OptionValues, WordSpec}
class ContextSerializationSpec extends WordSpec with Matchers with ContextTesting with OptionValues {
- "the Context is Serializable" ignore {
- "empty " in {
- val bos = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bos)
- oos.writeObject(Context.Empty)
-
- val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray))
- val ctx = ois.readObject().asInstanceOf[Context]
- ctx shouldBe Context.Empty
- }
-
- "full" in {
- val sCtx = Context.of(StringBroadcastKey, Some("disi"))
- val bos = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bos)
- oos.writeObject(sCtx)
-
- val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray))
- val rCtx = ois.readObject().asInstanceOf[Context]
- rCtx shouldBe sCtx
- }
-
- }
-
- val ContextCodec = new Codecs(Kamon.config())
+// "the Context is Serializable" ignore {
+// "empty " in {
+// val bos = new ByteArrayOutputStream()
+// val oos = new ObjectOutputStream(bos)
+// oos.writeObject(Context.Empty)
+//
+// val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray))
+// val ctx = ois.readObject().asInstanceOf[Context]
+// ctx shouldBe Context.Empty
+// }
+//
+// "full" in {
+// val sCtx = Context.of(StringBroadcastKey, Some("disi"))
+// val bos = new ByteArrayOutputStream()
+// val oos = new ObjectOutputStream(bos)
+// oos.writeObject(sCtx)
+//
+// val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray))
+// val rCtx = ois.readObject().asInstanceOf[Context]
+// rCtx shouldBe sCtx
+// }
+//
+// }
+//
+// val ContextCodec = new Codecs(Kamon.config())
} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
index 44165b98..ac1250ea 100644
--- a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
@@ -2,7 +2,8 @@ package kamon.context
import com.typesafe.config.ConfigFactory
import kamon.Kamon
-import kamon.context.HttpPropagation.Direction
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+import kamon.context.Propagation.{EntryReader, EntryWriter}
import org.scalatest.{Matchers, OptionValues, WordSpec}
import scala.collection.mutable
@@ -12,9 +13,8 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"The HTTP Context Propagation" when {
"reading from incoming requests" should {
"return an empty context if there are no tags nor keys" in {
- val context = httpPropagation.readContext(headerReaderFromMap(Map.empty))
- context.tags shouldBe empty
- context.entries shouldBe empty
+ val context = httpPropagation.read(headerReaderFromMap(Map.empty))
+ context.isEmpty() shouldBe true
}
"read tags from an HTTP message when they are available" in {
@@ -22,7 +22,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"x-content-tags" -> "hello=world;correlation=1234",
"x-mapped-tag" -> "value"
)
- val context = httpPropagation.readContext(headerReaderFromMap(headers))
+ val context = httpPropagation.read(headerReaderFromMap(headers))
context.tags should contain only(
"hello" -> "world",
"correlation" -> "1234",
@@ -32,7 +32,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"handle errors when reading HTTP headers" in {
val headers = Map("fail" -> "")
- val context = httpPropagation.readContext(headerReaderFromMap(headers))
+ val context = httpPropagation.read(headerReaderFromMap(headers))
context.tags shouldBe empty
context.entries shouldBe empty
}
@@ -44,7 +44,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"integer-header" -> "123"
)
- val context = httpPropagation.readContext(headerReaderFromMap(headers))
+ val context = httpPropagation.read(headerReaderFromMap(headers))
context.get(HttpPropagationSpec.StringKey) shouldBe "hey"
context.get(HttpPropagationSpec.IntegerKey) shouldBe 123
context.get(HttpPropagationSpec.OptionalKey) shouldBe empty
@@ -56,17 +56,9 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"writing to outgoing requests" should {
- propagationWritingTests(Direction.Outgoing)
- }
-
- "writing to returning requests" should {
- propagationWritingTests(Direction.Returning)
- }
-
- def propagationWritingTests(direction: Direction.Write) = {
"not write anything if the context is empty" in {
val headers = mutable.Map.empty[String, String]
- httpPropagation.writeContext(Context.Empty, headerWriterFromMap(headers), direction)
+ httpPropagation.write(Context.Empty, headerWriterFromMap(headers))
headers shouldBe empty
}
@@ -77,7 +69,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"mappedTag" -> "value"
))
- httpPropagation.writeContext(context, headerWriterFromMap(headers), direction)
+ httpPropagation.write(context, headerWriterFromMap(headers))
headers should contain only(
"x-content-tags" -> "hello=world;",
"x-mapped-tag" -> "value"
@@ -91,10 +83,10 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
HttpPropagationSpec.IntegerKey, 42,
)
- httpPropagation.writeContext(context, headerWriterFromMap(headers), direction)
+ httpPropagation.write(context, headerWriterFromMap(headers))
headers should contain only(
"string-header" -> "out-we-go"
- )
+ )
}
}
}
@@ -116,23 +108,24 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
|entries.incoming.string = "kamon.context.HttpPropagationSpec$StringEntryCodec"
|entries.incoming.integer = "kamon.context.HttpPropagationSpec$IntegerEntryCodec"
|entries.outgoing.string = "kamon.context.HttpPropagationSpec$StringEntryCodec"
- |entries.returning.string = "kamon.context.HttpPropagationSpec$StringEntryCodec"
|
""".stripMargin
).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon)
def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader {
- override def readHeader(header: String): Option[String] = {
+ override def read(header: String): Option[String] = {
if(map.get("fail").nonEmpty)
sys.error("failing on purpose")
map.get(header)
}
+
+ override def readAll(): Map[String, String] = map
}
def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter {
- override def writeHeader(header: String, value: String): Unit = map.put(header, value)
+ override def write(header: String, value: String): Unit = map.put(header, value)
}
}
@@ -143,23 +136,23 @@ object HttpPropagationSpec {
val OptionalKey = Context.key[Option[String]]("optional", None)
- class StringEntryCodec extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter {
+ class StringEntryCodec extends EntryReader[HeaderReader] with EntryWriter[HeaderWriter] {
private val HeaderName = "string-header"
- override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = {
- reader.readHeader(HeaderName)
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ reader.read(HeaderName)
.map(v => context.withKey(StringKey, v))
.getOrElse(context)
}
- override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
- Option(context.get(StringKey)).foreach(v => writer.writeHeader(HeaderName, v))
+ override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = {
+ Option(context.get(StringKey)).foreach(v => writer.write(HeaderName, v))
}
}
- class IntegerEntryCodec extends HttpPropagation.EntryReader {
- override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = {
- reader.readHeader("integer-header")
+ class IntegerEntryCodec extends EntryReader[HeaderReader] {
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ reader.read("integer-header")
.map(v => context.withKey(IntegerKey, v.toInt))
.getOrElse(context)
diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
index 19e08666..d334184d 100644
--- a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
@@ -272,13 +272,14 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
override def url: String = requestUrl
override def path: String = requestPath
override def method: String = requestMethod
- override def readHeader(header: String): Option[String] = headers.get(header)
+ override def read(header: String): Option[String] = headers.get(header)
+ override def readAll(): Map[String, String] = headers
}
def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpResponse.Writable[HttpResponse] =
new HttpResponse.Writable[HttpResponse] {
override def statusCode: Int = responseStatusCode
- override def writeHeader(header: String, value: String): Unit = headers.put(header, value)
+ override def write(header: String, value: String): Unit = headers.put(header, value)
override def build(): HttpResponse = this
}
diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala
index 76e07c31..37a95a68 100644
--- a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala
@@ -16,7 +16,7 @@
package kamon.trace
-import kamon.context.{Context, TextMap}
+import kamon.context.{Context}
import kamon.testkit.SpanBuilding
import kamon.trace.IdentityProvider.Identifier
import kamon.trace.SpanContext.SamplingDecision
diff --git a/kamon-core/src/main/colfer/Context.colf b/kamon-core/src/main/colfer/Context.colf
index f84d7a56..f5d9a80b 100644
--- a/kamon-core/src/main/colfer/Context.colf
+++ b/kamon-core/src/main/colfer/Context.colf
@@ -6,5 +6,6 @@ type Entry struct {
}
type Context struct {
+ tags []text
entries []Entry
} \ No newline at end of file
diff --git a/kamon-core/src/main/colfer/building.md b/kamon-core/src/main/colfer/building.md
index f510a44f..8b663828 100644
--- a/kamon-core/src/main/colfer/building.md
+++ b/kamon-core/src/main/colfer/building.md
@@ -1,4 +1,4 @@
-Just download and install the colver compiler and run this command from the colfer folder:
+Just download and install the colfer compiler and run this command from the colfer folder:
```
colfer -b ../java -p kamon/context/generated/binary java
diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
index a9917e99..4be6d630 100644
--- a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
+++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
@@ -12,6 +12,7 @@ import java.io.ObjectOutputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.InputMismatchException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
@@ -35,6 +36,8 @@ public class Context implements Serializable {
+ public String[] tags;
+
public Entry[] entries;
@@ -43,10 +46,12 @@ public class Context implements Serializable {
init();
}
+ private static final String[] _zeroTags = new String[0];
private static final Entry[] _zeroEntries = new Entry[0];
/** Colfer zero values. */
private void init() {
+ tags = _zeroTags;
entries = _zeroEntries;
}
@@ -142,6 +147,7 @@ public class Context implements Serializable {
/**
* Serializes the object.
+ * All {@code null} elements in {@link #tags} will be replaced with {@code ""}.
* All {@code null} elements in {@link #entries} will be replaced with a {@code new} value.
* @param out the data destination.
* @param buf the initial buffer or {@code null}.
@@ -171,6 +177,7 @@ public class Context implements Serializable {
/**
* Serializes the object.
+ * All {@code null} elements in {@link #tags} will be replaced with {@code ""}.
* All {@code null} elements in {@link #entries} will be replaced with a {@code new} value.
* @param buf the data destination.
* @param offset the initial index for {@code buf}, inclusive.
@@ -182,8 +189,72 @@ public class Context implements Serializable {
int i = offset;
try {
- if (this.entries.length != 0) {
+ if (this.tags.length != 0) {
buf[i++] = (byte) 0;
+ String[] a = this.tags;
+
+ int x = a.length;
+ if (x > Context.colferListMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", x, Context.colferListMax));
+ while (x > 0x7f) {
+ buf[i++] = (byte) (x | 0x80);
+ x >>>= 7;
+ }
+ buf[i++] = (byte) x;
+
+ for (int ai = 0; ai < a.length; ai++) {
+ String s = a[ai];
+ if (s == null) {
+ s = "";
+ a[ai] = s;
+ }
+
+ int start = ++i;
+
+ for (int sIndex = 0, sLength = s.length(); sIndex < sLength; sIndex++) {
+ char c = s.charAt(sIndex);
+ if (c < '\u0080') {
+ buf[i++] = (byte) c;
+ } else if (c < '\u0800') {
+ buf[i++] = (byte) (192 | c >>> 6);
+ buf[i++] = (byte) (128 | c & 63);
+ } else if (c < '\ud800' || c > '\udfff') {
+ buf[i++] = (byte) (224 | c >>> 12);
+ buf[i++] = (byte) (128 | c >>> 6 & 63);
+ buf[i++] = (byte) (128 | c & 63);
+ } else {
+ int cp = 0;
+ if (++sIndex < sLength) cp = Character.toCodePoint(c, s.charAt(sIndex));
+ if ((cp >= 1 << 16) && (cp < 1 << 21)) {
+ buf[i++] = (byte) (240 | cp >>> 18);
+ buf[i++] = (byte) (128 | cp >>> 12 & 63);
+ buf[i++] = (byte) (128 | cp >>> 6 & 63);
+ buf[i++] = (byte) (128 | cp & 63);
+ } else
+ buf[i++] = (byte) '?';
+ }
+ }
+ int size = i - start;
+ if (size > Context.colferSizeMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax));
+
+ int ii = start - 1;
+ if (size > 0x7f) {
+ i++;
+ for (int y = size; y >= 1 << 14; y >>>= 7) i++;
+ System.arraycopy(buf, start, buf, i - size, size);
+
+ do {
+ buf[ii++] = (byte) (size | 0x80);
+ size >>>= 7;
+ } while (size > 0x7f);
+ }
+ buf[ii] = (byte) size;
+ }
+ }
+
+ if (this.entries.length != 0) {
+ buf[i++] = (byte) 1;
Entry[] a = this.entries;
int x = a.length;
@@ -253,6 +324,35 @@ public class Context implements Serializable {
if (shift == 28 || b >= 0) break;
}
if (length < 0 || length > Context.colferListMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", length, Context.colferListMax));
+
+ String[] a = new String[length];
+ for (int ai = 0; ai < length; ai++) {
+ int size = 0;
+ for (int shift = 0; true; shift += 7) {
+ byte b = buf[i++];
+ size |= (b & 0x7f) << shift;
+ if (shift == 28 || b >= 0) break;
+ }
+ if (size < 0 || size > Context.colferSizeMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax));
+
+ int start = i;
+ i += size;
+ a[ai] = new String(buf, start, size, StandardCharsets.UTF_8);
+ }
+ this.tags = a;
+ header = buf[i++];
+ }
+
+ if (header == (byte) 1) {
+ int length = 0;
+ for (int shift = 0; true; shift += 7) {
+ byte b = buf[i++];
+ length |= (b & 0x7f) << shift;
+ if (shift == 28 || b >= 0) break;
+ }
+ if (length < 0 || length > Context.colferListMax)
throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", length, Context.colferListMax));
Entry[] a = new Entry[length];
@@ -278,7 +378,7 @@ public class Context implements Serializable {
}
// {@link Serializable} version number.
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
// {@link Serializable} Colfer extension.
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -312,6 +412,32 @@ public class Context implements Serializable {
}
/**
+ * Gets kamon/context/generated/binary/context.Context.tags.
+ * @return the value.
+ */
+ public String[] getTags() {
+ return this.tags;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/context.Context.tags.
+ * @param value the replacement.
+ */
+ public void setTags(String[] value) {
+ this.tags = value;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/context.Context.tags.
+ * @param value the replacement.
+ * @return {link this}.
+ */
+ public Context withTags(String[] value) {
+ this.tags = value;
+ return this;
+ }
+
+ /**
* Gets kamon/context/generated/binary/context.Context.entries.
* @return the value.
*/
@@ -340,6 +466,7 @@ public class Context implements Serializable {
@Override
public final int hashCode() {
int h = 1;
+ for (String o : this.tags) h = 31 * h + (o == null ? 0 : o.hashCode());
for (Entry o : this.entries) h = 31 * h + (o == null ? 0 : o.hashCode());
return h;
}
@@ -353,6 +480,7 @@ public class Context implements Serializable {
if (o == null) return false;
if (o == this) return true;
return o.getClass() == Context.class
+ && java.util.Arrays.equals(this.tags, o.tags)
&& java.util.Arrays.equals(this.entries, o.entries);
}
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 5984f3c4..a4a7871e 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -238,10 +238,27 @@ kamon {
outgoing {
}
+ }
+ }
+ }
+
+ binary {
+
+ # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on
+ # this section for HTTP context propagation.
+ #
+ default {
+ entries {
+
+ # Specify mappings between Context keys and the Http.EntryReader implementation in charge of reading them
+ # from the incoming HTTP request into the Context.
+ incoming {
+ #span = "something"
+ }
# Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them
- # on the outgoing HTTP response sent back to clients.
- returning {
+ # on the outgoing HTTP requests.
+ outgoing {
}
}
diff --git a/kamon-core/src/main/scala/kamon/ContextPropagation.scala b/kamon-core/src/main/scala/kamon/ContextPropagation.scala
index bfcfbf7e..8f2ed8e4 100644
--- a/kamon-core/src/main/scala/kamon/ContextPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/ContextPropagation.scala
@@ -1,11 +1,14 @@
package kamon
import com.typesafe.config.Config
-import kamon.context.HttpPropagation
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+import kamon.context.{BinaryPropagation, HttpPropagation, Propagation}
trait ContextPropagation { self: Configuration with ClassLoading =>
@volatile private var _propagationComponents: ContextPropagation.Components = _
- @volatile private var _defaultHttpPropagation: HttpPropagation = _
+ @volatile private var _defaultHttpPropagation: Propagation[HeaderReader, HeaderWriter] = _
+ @volatile private var _defaultBinaryPropagation: Propagation[ByteStreamReader, ByteStreamWriter] = _
// Initial configuration and reconfigures
init(self.config)
@@ -14,28 +17,48 @@ trait ContextPropagation { self: Configuration with ClassLoading =>
/**
* Retrieves the HTTP propagation channel with the supplied name. Propagation channels are configured on the
- * kamon.propagation.channels configuration setting.
+ * kamon.propagation.http configuration section.
*
* @param channelName Channel name to retrieve.
- * @return The HTTP propagation, if defined.
+ * @return The HTTP propagation, if available.
*/
- def httpPropagation(channelName: String): Option[HttpPropagation] =
+ def httpPropagation(channelName: String): Option[Propagation[HeaderReader, HeaderWriter]] =
_propagationComponents.httpChannels.get(channelName)
/**
+ * Retrieves the binary propagation channel with the supplied name. Propagation channels are configured on the
+ * kamon.propagation.binary configuration section.
+ *
+ * @param channelName Channel name to retrieve.
+ * @return The binary propagation, if available.
+ */
+ def binaryPropagation(channelName: String): Option[Propagation[ByteStreamReader, ByteStreamWriter]] =
+ _propagationComponents.binaryChannels.get(channelName)
+
+ /**
* Retrieves the default HTTP propagation channel. Configuration for this channel can be found under the
- * kamon.propagation.channels.http configuration setting.
+ * kamon.propagation.http.default configuration section.
*
* @return The default HTTP propagation.
*/
- def defaultHttpPropagation(): HttpPropagation =
+ def defaultHttpPropagation(): Propagation[HeaderReader, HeaderWriter] =
_defaultHttpPropagation
+ /**
+ * Retrieves the default binary propagation channel. Configuration for this channel can be found under the
+ * kamon.propagation.binary.default configuration section.
+ *
+ * @return The default HTTP propagation.
+ */
+ def defaultBinaryPropagation(): Propagation[ByteStreamReader, ByteStreamWriter] =
+ _defaultBinaryPropagation
+
private def init(config: Config): Unit = synchronized {
_propagationComponents = ContextPropagation.Components.from(self.config, self)
_defaultHttpPropagation = _propagationComponents.httpChannels(ContextPropagation.DefaultHttpChannel)
+ _defaultBinaryPropagation = _propagationComponents.binaryChannels(ContextPropagation.DefaultBinaryChannel)
}
}
@@ -44,7 +67,8 @@ object ContextPropagation {
val DefaultBinaryChannel = "default"
case class Components(
- httpChannels: Map[String, HttpPropagation]
+ httpChannels: Map[String, Propagation[HeaderReader, HeaderWriter]],
+ binaryChannels: Map[String, Propagation[ByteStreamReader, ByteStreamWriter]]
)
object Components {
@@ -52,11 +76,17 @@ object ContextPropagation {
def from(config: Config, classLoading: ClassLoading): Components = {
val propagationConfig = config.getConfig("kamon.propagation")
val httpChannelsConfig = propagationConfig.getConfig("http").configurations
+ val binaryChannelsConfig = propagationConfig.getConfig("binary").configurations
+
val httpChannels = httpChannelsConfig.map {
case (channelName, channelConfig) => (channelName -> HttpPropagation.from(channelConfig, classLoading))
}
- Components(httpChannels)
+ val binaryChannels = binaryChannelsConfig.map {
+ case (channelName, channelConfig) => (channelName -> BinaryPropagation.from(channelConfig, classLoading))
+ }
+
+ Components(httpChannels, binaryChannels)
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 2825d961..a6cc7bf4 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -19,7 +19,6 @@ import java.time.Duration
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
import com.typesafe.config.{Config, ConfigFactory}
-import kamon.context.Codecs
import kamon.metric._
import kamon.trace._
import kamon.util.{Clock, Filters, Matcher, Registration}
@@ -41,7 +40,6 @@ object Kamon extends MetricLookup with ClassLoading with Configuration with Repo
private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
- private val _contextCodec = new Codecs(_config)
//private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
sys.addShutdownHook(() => _scheduler.shutdown())
@@ -56,7 +54,6 @@ object Kamon extends MetricLookup with ClassLoading with Configuration with Repo
_metrics.reconfigure(config)
_reporterRegistry.reconfigure(config)
_tracer.reconfigure(config)
- _contextCodec.reconfigure(config)
_scheduler match {
case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
@@ -91,11 +88,6 @@ object Kamon extends MetricLookup with ClassLoading with Configuration with Repo
override def identityProvider: IdentityProvider =
_tracer.identityProvider
- def contextCodec(): Codecs =
- _contextCodec
-
-
-
override def loadReportersFromConfig(): Unit =
_reporterRegistry.loadReportersFromConfig()
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
new file mode 100644
index 00000000..847ac452
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
@@ -0,0 +1,277 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
+
+import com.typesafe.config.Config
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
+
+
+/**
+ * Context propagation that uses byte stream abstractions as the transport medium. The Binary propagation uses
+ * instances of [[ByteStreamReader]] and [[ByteStreamWriter]] to decode and encode Context instances, respectively.
+ *
+ * Binary propagation uses the [[ByteStreamReader]] and [[ByteStreamWriter]] abstraction which closely model the APIs
+ * from [[InputStream]] and [[OutputStream]], but without exposing additional functionality that wouldn't have any
+ * well defined behavior for Context propagation, e.g. flush or close functions on OutputStreams.
+ */
+object BinaryPropagation {
+
+ /**
+ * Represents a readable stream of bytes. This interface closely resembles [[InputStream]], minus the functionality
+ * that wouldn't have a clearly defined behavior in the context of Context propagation.
+ */
+ trait ByteStreamReader {
+ /**
+ * Number of available bytes on the ByteStream.
+ */
+ def available(): Int
+
+ /**
+ * Reads as many bytes as possible into the target byte array.
+ *
+ * @param target Target buffer in which the read bytes will be written.
+ * @return The number of bytes written into the target buffer.
+ */
+ def read(target: Array[Byte]): Int
+
+ /**
+ * Reads a specified number of bytes into the target buffer, starting from the offset position.
+ *
+ * @param target Target buffer in which read bytes will be written.
+ * @param offset Offset index in which to start writing bytes on the target buffer.
+ * @param count Number of bytes to be read.
+ * @return The number of bytes written into the target buffer.
+ */
+ def read(target: Array[Byte], offset: Int, count: Int): Int
+
+ /**
+ * Reads all available bytes into a newly created byte array.
+ *
+ * @return All bytes read.
+ */
+ def readAll(): Array[Byte]
+ }
+
+
+ object ByteStreamReader {
+
+ /**
+ * Creates a new [[ByteStreamReader]] that reads data from a byte array.
+ */
+ def of(bytes: Array[Byte]): ByteStreamReader = new ByteArrayInputStream(bytes) with ByteStreamReader {
+ override def readAll(): Array[Byte] = {
+ val target = Array.ofDim[Byte](available())
+ read(target, 0, available())
+ target
+ }
+ }
+ }
+
+
+ /**
+ * Represents a writable stream of bytes. This interface closely resembles [[OutputStream]], minus the functionality
+ * that wouldn't have a clearly defined behavior in the context of Context propagation.
+ */
+ trait ByteStreamWriter {
+
+ /**
+ * Writes all bytes into the stream.
+ */
+ def write(bytes: Array[Byte]): Unit
+
+ /**
+ * Writes a portion of the provided bytes into the stream.
+ *
+ * @param bytes Buffer from which data will be selected.
+ * @param offset Starting index on the buffer.
+ * @param count Number of bytes to write into the stream.
+ */
+ def write(bytes: Array[Byte], offset: Int, count: Int): Unit
+
+ /**
+ * Write a single byte into the stream.
+ */
+ def write(byte: Int): Unit
+
+ }
+
+ object ByteStreamWriter {
+
+ /**
+ * Creates a new [[ByteStreamWriter]] from an OutputStream.
+ */
+ def of(outputStream: OutputStream): ByteStreamWriter = new ByteStreamWriter {
+ override def write(bytes: Array[Byte]): Unit =
+ outputStream.write(bytes)
+
+ override def write(bytes: Array[Byte], offset: Int, count: Int): Unit =
+ outputStream.write(bytes, offset, count)
+
+ override def write(byte: Int): Unit =
+ outputStream.write(byte)
+ }
+ }
+
+
+
+ /**
+ * Create a new default Binary Propagation instance from the provided configuration.
+ *
+ * @param config Binary Propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): Propagation[ByteStreamReader, ByteStreamWriter] = {
+ new BinaryPropagation.Default(Settings.from(config, classLoading))
+ }
+
+ /**
+ * Default Binary propagation in Kamon. This implementation uses Colfer to read and write the context tags and
+ * entries. Entries are represented as simple pairs of entry name and bytes, which are then processed by the all
+ * configured entry readers and writers.
+ */
+ class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] {
+ private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
+ private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] {
+ override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
+ }
+
+ override def read(reader: ByteStreamReader): Context = {
+ if(reader.available() > 0) {
+ val contextData = new ColferContext()
+ contextData.unmarshal(reader.readAll(), 0)
+
+ // Context tags
+ var tagSectionsCount = contextData.tags.length
+ if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
+ _log.warn("Malformed context tags found when trying to read a Context from ByteStreamReader")
+ tagSectionsCount -= 1
+ }
+
+ val tags = if (tagSectionsCount > 0) {
+ val tagsBuilder = Map.newBuilder[String, String]
+ var tagIndex = 0
+ while (tagIndex < tagSectionsCount) {
+ tagsBuilder += (contextData.tags(tagIndex) -> contextData.tags(tagIndex + 1))
+ tagIndex += 2
+ }
+ tagsBuilder.result()
+
+ } else Map.empty[String, String]
+
+
+ // Only reads the entries for which there is a registered reader
+ contextData.entries.foldLeft(Context.of(tags)) {
+ case (context, entryData) =>
+ settings.incomingEntries.get(entryData.name).map { entryReader =>
+ var contextWithEntry = context
+ try {
+ contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
+ }
+
+ contextWithEntry
+ }.getOrElse(context)
+ }
+ } else Context.Empty
+ }
+
+ override def write(context: Context, writer: ByteStreamWriter): Unit = {
+ if (context.nonEmpty()) {
+ val contextData = new ColferContext()
+ val output = _streamPool.get()
+
+ if (context.tags.nonEmpty) {
+ val tags = Array.ofDim[String](context.tags.size * 2)
+ var tagIndex = 0
+ context.tags.foreach {
+ case (key, value) =>
+ tags.update(tagIndex, key)
+ tags.update(tagIndex + 1, value)
+ tagIndex += 2
+ }
+
+ contextData.tags = tags
+ }
+
+ if (context.entries.nonEmpty) {
+ val entries = settings.outgoingEntries.collect {
+ case (entryName, entryWriter) if context.entries.contains(entryName) =>
+ output.reset()
+ entryWriter.write(context, output)
+
+ val colferEntry = new ColferEntry()
+ colferEntry.name = entryName
+ colferEntry.content = output.toByteArray()
+ colferEntry
+ }
+
+ contextData.entries = entries.toArray
+ }
+
+ output.reset()
+ // TODO: avoid internal allocation of byte[] on the marshal method. Use ReusableByteStreamWriter's underlying buffer.
+ contextData.marshal(output, null)
+ writer.write(output.toByteArray)
+ }
+ }
+ }
+
+ object Default {
+ private class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter {
+ def underlying(): Array[Byte] = this.buf
+ }
+ }
+
+ case class Settings(
+ incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]]
+ )
+
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[BinaryPropagation.Settings])
+
+ def from(config: Config, classLoading: ClassLoading): BinaryPropagation.Settings = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val instanceMap = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
+ }
+ }
+
+ instanceMap.result()
+ }
+
+ Settings(
+ buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs),
+ buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs)
+ )
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala
deleted file mode 100644
index 465f53be..00000000
--- a/kamon-core/src/main/scala/kamon/context/Codecs.scala
+++ /dev/null
@@ -1,297 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon
-package context
-
-import java.nio.ByteBuffer
-
-import com.typesafe.config.Config
-import kamon.util.DynamicAccess
-import org.slf4j.LoggerFactory
-import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
-
-import scala.collection.mutable
-
-class Codecs(initialConfig: Config) {
- private val log = LoggerFactory.getLogger(classOf[Codecs])
- @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty)
- @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty)
-
- reconfigure(initialConfig)
-
-
- def HttpHeaders: Codecs.ForContext[TextMap] =
- httpHeaders
-
- def Binary: Codecs.ForContext[ByteBuffer] =
- binary
-
- def reconfigure(config: Config): Unit = {
- import scala.collection.JavaConverters._
- try {
- val codecsConfig = config.getConfig("kamon.context.codecs")
- val stringKeys = readStringKeysConfig(codecsConfig.getConfig("string-keys"))
- val knownHttpHeaderCodecs = readEntryCodecs[TextMap]("http-headers-keys", codecsConfig) ++ stringHeaderCodecs(stringKeys)
- val knownBinaryCodecs = readEntryCodecs[ByteBuffer]("binary-keys", codecsConfig) ++ stringBinaryCodecs(stringKeys)
-
- httpHeaders = new Codecs.HttpHeaders(knownHttpHeaderCodecs)
- binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), knownBinaryCodecs)
- } catch {
- case t: Throwable => log.error("Failed to initialize Context Codecs", t)
- }
- }
-
- private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = {
- val rootConfig = config.getConfig(rootKey)
- val dynamic = new DynamicAccess(getClass.getClassLoader)
- val entries = Map.newBuilder[String, Codecs.ForEntry[T]]
-
- rootConfig.topLevelKeys.foreach(key => {
- try {
- val fqcn = rootConfig.getString(key)
- entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get))
- } catch {
- case e: Throwable =>
- log.error(s"Failed to initialize codec for key [$key]", e)
- }
- })
-
- entries.result()
- }
-
- private def readStringKeysConfig(config: Config): Map[String, String] =
- config.topLevelKeys.map(key => (key, config.getString(key))).toMap
-
- private def stringHeaderCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[TextMap]] =
- keys.map { case (key, header) => (key, new Codecs.StringHeadersCodec(key, header)) }
-
- private def stringBinaryCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[ByteBuffer]] =
- keys.map { case (key, _) => (key, new Codecs.StringBinaryCodec(key)) }
-}
-
-object Codecs {
-
- trait ForContext[T] {
- def encode(context: Context): T
- def decode(carrier: T): Context
- }
-
- trait ForEntry[T] {
- def encode(context: Context): T
- def decode(carrier: T, context: Context): Context
- }
-
- final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] {
- private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
-
- override def encode(context: Context): TextMap = {
- val encoded = TextMap.Default()
-
-// context.entries.foreach {
-// case (key, _) if key.broadcast =>
-// entryCodecs.get(key.name) match {
-// case Some(codec) =>
-// try {
-// codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
-// } catch {
-// case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
-// }
-//
-// case None =>
-// log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
-// }
-//
-// case _ => // All non-broadcast keys should be ignored.
-// }
-
- encoded
- }
-
- override def decode(carrier: TextMap): Context = {
- var context: Context = Context.Empty
-
- try {
- context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
- val (_, codec) = codecEntry
- codec.decode(carrier, ctx)
- })
-
- } catch {
- case e: Throwable =>
- log.error("Failed to decode context from HttpHeaders", e)
- }
-
- context
- }
- }
-
-
- final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] {
- private val log = LoggerFactory.getLogger(classOf[Binary])
- private val binaryBuffer = newThreadLocalBuffer(bufferSize)
- private val emptyBuffer = ByteBuffer.allocate(0)
-
- override def encode(context: Context): ByteBuffer = {
- val entries = context.entries
- if(entries.isEmpty)
- emptyBuffer
- else {
- var colferEntries: List[ColferEntry] = Nil
-// entries.foreach {
-// case (key, _) if key.broadcast =>
-// entryCodecs.get(key.name) match {
-// case Some(entryCodec) =>
-// try {
-// val entryData = entryCodec.encode(context)
-// if(entryData.capacity() > 0) {
-// val colferEntry = new ColferEntry()
-// colferEntry.setName(key.name)
-// colferEntry.setContent(entryData.array())
-// colferEntries = colferEntry :: colferEntries
-// }
-// } catch {
-// case throwable: Throwable =>
-// log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
-// }
-//
-// case None =>
-// log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
-// }
-//
-// case _ => // All non-broadcast keys should be ignored.
-// }
-
- if(colferEntries.isEmpty)
- emptyBuffer
- else {
- val buffer = binaryBuffer.get()
- val colferContext = new ColferContext()
- colferContext.setEntries(colferEntries.toArray)
- val marshalledSize = colferContext.marshal(buffer, 0)
-
- val data = Array.ofDim[Byte](marshalledSize)
- System.arraycopy(buffer, 0, data, 0, marshalledSize)
- ByteBuffer.wrap(data)
- }
- }
- }
-
-
- override def decode(carrier: ByteBuffer): Context = {
- if(carrier.capacity() == 0)
- Context.Empty
- else {
- var context: Context = Context.Empty
-
- try {
- val colferContext = new ColferContext()
- colferContext.unmarshal(carrier.array(), 0)
-
- colferContext.entries.foreach(colferEntry => {
- entryCodecs.get(colferEntry.getName()) match {
- case Some(entryCodec) =>
- context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context)
-
- case None =>
- log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName())
- }
- })
-
- } catch {
- case e: Throwable =>
- log.error("Failed to decode context from Binary", e)
- }
-
- context
- }
- }
-
-
- private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] {
- override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt)
- }
- }
-
- private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] {
- //private val contextKey = Key.broadcast[Option[String]](key, None)
-
- override def encode(context: Context): TextMap = {
- val textMap = TextMap.Default()
-// context.get(contextKey).foreach { value =>
-// textMap.put(headerName, value)
-// }
-
- textMap
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
- ???
-// carrier.get(headerName) match {
-// case value @ Some(_) => context.withKey(contextKey, value)
-// case None => context
-// }
- }
- }
-
- private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] {
- val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0)
- //private val contextKey = Key.broadcast[Option[String]](key, None)
-
- override def encode(context: Context): ByteBuffer = {
-// context.get(contextKey) match {
-// case Some(value) => ByteBuffer.wrap(value.getBytes)
-// case None => emptyBuffer
-// }
- ???
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- ??? //context.withKey(contextKey, Some(new String(carrier.array())))
- }
- }
-}
-
-
-trait TextMap {
-
- def get(key: String): Option[String]
-
- def put(key: String, value: String): Unit
-
- def values: Iterator[(String, String)]
-}
-
-object TextMap {
-
- class Default extends TextMap {
- private val storage =
- mutable.Map.empty[String, String]
-
- override def get(key: String): Option[String] =
- storage.get(key)
-
- override def put(key: String, value: String): Unit =
- storage.put(key, value)
-
- override def values: Iterator[(String, String)] =
- storage.toIterator
- }
-
- object Default {
- def apply(): Default =
- new Default()
- }
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index 4d3501db..2a7a382e 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -1,5 +1,5 @@
/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -19,16 +19,16 @@ package context
import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
-class Context private (private[context] val entries: Map[Context.Key[_], Any], val tags: Map[String, String]) {
+class Context private (val entries: Map[String, Any], val tags: Map[String, String]) {
def get[T](key: Context.Key[T]): T =
- entries.getOrElse(key, key.emptyValue).asInstanceOf[T]
+ entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T]
def getTag(tagKey: String): Option[String] =
tags.get(tagKey)
def withKey[T](key: Context.Key[T], value: T): Context =
- new Context(entries.updated(key, value), tags)
+ new Context(entries.updated(key.name, value), tags)
def withTag(tagKey: String, tagValue: String): Context =
new Context(entries, tags.updated(tagKey, tagValue))
@@ -39,6 +39,11 @@ class Context private (private[context] val entries: Map[Context.Key[_], Any], v
def withTags(tags: JavaMap[String, String]): Context =
new Context(entries, this.tags ++ tags.asScala.toMap)
+ def isEmpty(): Boolean =
+ entries.isEmpty && tags.isEmpty
+
+ def nonEmpty(): Boolean =
+ !isEmpty()
}
@@ -53,22 +58,22 @@ object Context {
new Context(Map.empty, tags)
def of[T](key: Context.Key[T], value: T): Context =
- new Context(Map(key -> value), Map.empty)
+ new Context(Map(key.name -> value), Map.empty)
def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context =
- new Context(Map(key -> value), tags.asScala.toMap)
+ new Context(Map(key.name -> value), tags.asScala.toMap)
def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context =
- new Context(Map(key -> value), tags)
+ new Context(Map(key.name -> value), tags)
def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context =
- new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), Map.empty)
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty)
def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context =
- new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags.asScala.toMap)
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap)
def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context =
- new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags)
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags)
def key[T](name: String, emptyValue: T): Context.Key[T] =
new Context.Key(name, emptyValue)
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
index d53a6250..6a15e2a6 100644
--- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -1,3 +1,18 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon
package context
@@ -9,89 +24,31 @@ import scala.util.control.NonFatal
import scala.util.{Failure, Success}
/**
- * Context Propagation for HTTP transports. When using HTTP transports all the context related information is
- * read from and written to HTTP headers. The context information may be included in the following directions:
- * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read.
- * - Outgoing: Used for HTTP requests leaving this service.
- * - Returning: Used for HTTP responses send back to clients of this service.
+ * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of
+ * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on
+ * outgoing requests to transfer a context to remote services.
*/
-trait HttpPropagation {
-
- /**
- * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a
- * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is
- * implementation specific.
- *
- * @param reader Wrapper on the HTTP message from which headers are read.
- * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
- * empty context is returned instead.
- */
- def readContext(reader: HttpPropagation.HeaderReader): Context
-
- /**
- * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
- * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation
- * specific.
- *
- * Implementations are expected to produce side effects on the wrapped HTTP Messages.
- *
- * @param context Context instance to be written.
- * @param writer Wrapper on the HTTP message that will carry the context headers.
- * @param direction Write direction. It can be either Outgoing or Returning.
- */
- def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
-
-}
-
object HttpPropagation {
/**
- * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait
- * must be aware of the entry they are able to read and the HTTP headers required to do so.
+ * Wrapper that reads HTTP headers from a HTTP message.
*/
- trait EntryReader {
-
- /**
- * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations
- * must return an updated context instance that includes such entry. If no entry could be read simply return
- * context instance that was passed in, untouched.
- *
- * @param reader Wrapper on the HTTP message from which headers are read.
- * @param context Current context.
- * @return Either the original context passed in or a modified version of it, including the read entry.
- */
- def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context
- }
-
- /**
- * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait
- * must be aware of the entry they are able to write and the HTTP headers required to do so.
- */
- trait EntryWriter {
+ trait HeaderReader {
/**
- * Tries to write a context entry into HTTP headers.
+ * Reads a single HTTP header value.
*
- * @param context The context from which entries should be written.
- * @param writer Wrapper on the HTTP message that will carry the context headers.
- * @param direction Write direction. It can be either Outgoing or Returning.
+ * @param header HTTP header name
+ * @return The HTTP header value, if present.
*/
- def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
- }
-
-
- /**
- * Wrapper that reads HTTP headers from HTTP a message.
- */
- trait HeaderReader {
+ def read(header: String): Option[String]
/**
- * Reads an HTTP header value
+ * Reads all HTTP headers present in the wrapped HTTP message.
*
- * @param header HTTP header name
- * @return The HTTP header value, if present.
+ * @return A map from header name to
*/
- def readHeader(header: String): Option[String]
+ def readAll(): Map[String, String]
}
/**
@@ -105,39 +62,40 @@ object HttpPropagation {
* @param header HTTP header name.
* @param value HTTP header value.
*/
- def writeHeader(header: String, value: String): Unit
+ def write(header: String, value: String): Unit
}
+
/**
- * Create a new default HttpPropagation instance from the provided configuration.
+ * Create a new default HTTP propagation instance from the provided configuration.
*
* @param config HTTP propagation channel configuration
* @return A newly constructed HttpPropagation instance.
*/
- def from(config: Config, classLoading: ClassLoading): HttpPropagation = {
- new HttpPropagation.Default(Components.from(config, classLoading))
+ def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = {
+ new HttpPropagation.Default(Settings.from(config, classLoading))
}
/**
* Default HTTP Propagation in Kamon.
*/
- final class Default(components: Components) extends HttpPropagation {
+ class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] {
private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
/**
* Reads context tags and entries on the following order:
- * - Read all context tags from the context tags header.
- * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case
- * of a tag key clash.
- * - Read all context entries using the incoming entries configuration.
+ * 1. Read all context tags from the context tags header.
+ * 2. Read all context tags with explicit mappings. This overrides any tag
+ * from the previous step in case of a tag key clash.
+ * 3. Read all context entries using the incoming entries configuration.
*/
- override def readContext(reader: HeaderReader): Context = {
+ override def read(reader: HeaderReader): Context = {
val tags = Map.newBuilder[String, String]
// Tags encoded together in the context tags header.
try {
- reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader =>
+ reader.read(settings.tagsHeaderName).foreach { contextTagsHeader =>
contextTagsHeader.split(";").foreach(tagData => {
val tagPair = tagData.split("=")
if (tagPair.length == 2) {
@@ -150,21 +108,21 @@ object HttpPropagation {
}
// Tags explicitly mapped on the tags.mappings configuration.
- components.tagsMappings.foreach {
+ settings.tagsMappings.foreach {
case (tagName, httpHeader) =>
try {
- reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
} catch {
case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
}
}
// Incoming Entries
- components.incomingEntries.foldLeft(Context.of(tags.result())) {
+ settings.incomingEntries.foldLeft(Context.of(tags.result())) {
case (context, (entryName, entryDecoder)) =>
var result = context
try {
- result = entryDecoder.readEntry(reader, context)
+ result = entryDecoder.read(reader, context)
} catch {
case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
@@ -176,12 +134,7 @@ object HttpPropagation {
/**
* Writes context tags and entries
*/
- override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
- val keys = direction match {
- case Direction.Outgoing => components.outgoingEntries
- case Direction.Returning => components.returningEntries
- }
-
+ override def write(context: Context, writer: HeaderWriter): Unit = {
val contextTagsHeader = new StringBuilder()
def appendTag(key: String, value: String): Unit = {
contextTagsHeader
@@ -193,22 +146,22 @@ object HttpPropagation {
// Write tags with specific mappings or append them to the context tags header.
context.tags.foreach {
- case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
- case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue)
+ case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
case None => appendTag(tagKey, tagValue)
}
}
// Write the context tags header.
if(contextTagsHeader.nonEmpty) {
- writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result())
+ writer.write(settings.tagsHeaderName, contextTagsHeader.result())
}
// Write entries for the specified direction.
- keys.foreach {
+ settings.outgoingEntries.foreach {
case (entryName, entryWriter) =>
try {
- entryWriter.writeEntry(context, writer, direction)
+ entryWriter.write(context, writer)
} catch {
case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
@@ -216,68 +169,37 @@ object HttpPropagation {
}
}
- /**
- * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to
- * propagate context.
- */
- sealed trait Direction
- object Direction {
-
- /**
- * Marker trait for all directions that require write operations.
- */
- sealed trait Write
-
- /**
- * Requests coming into this service.
- */
- case object Incoming extends Direction
-
- /**
- * Requests going from this service to others.
- */
- case object Outgoing extends Direction with Write
-
- /**
- * Responses sent from this service to clients.
- */
- case object Returning extends Direction with Write
- }
-
-
- case class Components(
+ case class Settings(
tagsHeaderName: String,
tagsMappings: Map[String, String],
- incomingEntries: Map[String, HttpPropagation.EntryReader],
- outgoingEntries: Map[String, HttpPropagation.EntryWriter],
- returningEntries: Map[String, HttpPropagation.EntryWriter]
+ incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]]
)
- object Components {
- private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components])
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings])
- def from(config: Config, classLoading: ClassLoading): Components = {
+ def from(config: Config, classLoading: ClassLoading): Settings = {
def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
- val entryReaders = Map.newBuilder[String, ExpectedType]
+ val instanceMap = Map.newBuilder[String, ExpectedType]
mappings.foreach {
- case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match {
- case Success(readerInstance) => entryReaders += (contextKey -> readerInstance)
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
- implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception)
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
}
}
- entryReaders.result()
+ instanceMap.result()
}
val tagsHeaderName = config.getString("tags.header-name")
val tagsMappings = config.getConfig("tags.mappings").pairs
- val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs)
- val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs)
- val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs)
+ val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs)
+ val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs)
- Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries)
+ Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries)
}
}
diff --git a/kamon-core/src/main/scala/kamon/context/Propagation.scala b/kamon-core/src/main/scala/kamon/context/Propagation.scala
new file mode 100644
index 00000000..1d973ca9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Propagation.scala
@@ -0,0 +1,81 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.context
+
+/**
+ * Inter-process Context propagation interface. Implementations are responsible from reading and writing a lossless
+ * representation of a given Context instance to the appropriate mediums. Out of the box, Kamon ships with two
+ * implementations:
+ *
+ * * HttpPropagation: Uses HTTP headers as the medium to transport the Context data.
+ * * BinaryPropagation: Uses byte streams as the medium to transport the Context data.
+ *
+ */
+trait Propagation[ReaderMedium, WriterMedium] {
+
+ /**
+ * Attempts to read a Context from the [[ReaderMedium]].
+ *
+ * @param medium An abstraction the reads data from the medium transporting the Context data.
+ * @return The decoded Context instance or an empty Context if no entries or tags could be read from the medium.
+ */
+ def read(medium: ReaderMedium): Context
+
+ /**
+ * Attempts to write a Context instance to the [[WriterMedium]].
+ *
+ * @param context Context instance to be written.
+ * @param medium An abstraction that writes data into the medium that will transport the Context data.
+ */
+ def write(context: Context, medium: WriterMedium): Unit
+
+}
+
+object Propagation {
+
+ /**
+ * Encapsulates logic required to read a single context entry from a medium. Implementations of this trait
+ * must be aware of the entry they are able to read.
+ */
+ trait EntryReader[Medium] {
+
+ /**
+ * Tries to read a context entry from the medium. If a context entry is successfully read, implementations
+ * must return an updated context instance that includes such entry. If no entry could be read simply return the
+ * context instance that was passed in, unchanged.
+ *
+ * @param medium An abstraction the reads data from the medium transporting the Context data.
+ * @param context Current context.
+ * @return Either the original context passed in or a modified version of it, including the read entry.
+ */
+ def read(medium: Medium, context: Context): Context
+ }
+
+ /**
+ * Encapsulates logic required to write a single context entry to the medium. Implementations of this trait
+ * must be aware of the entry they are able to write.
+ */
+ trait EntryWriter[Medium] {
+
+ /**
+ * Tries to write a context entry into the medium.
+ *
+ * @param context The context from which entries should be written.
+ * @param medium An abstraction that writes data into the medium that will transport the Context data.
+ */
+ def write(context: Context, medium: Medium): Unit
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Storage.scala b/kamon-core/src/main/scala/kamon/context/Storage.scala
index 4f4e6cbb..7bbca111 100644
--- a/kamon-core/src/main/scala/kamon/context/Storage.scala
+++ b/kamon-core/src/main/scala/kamon/context/Storage.scala
@@ -1,5 +1,5 @@
/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
index 3cb45619..02e92f25 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -5,7 +5,6 @@ import java.time.Duration
import com.typesafe.config.Config
import kamon.context.Context
-import kamon.context.HttpPropagation.Direction
import kamon.instrumentation.HttpServer.Settings.TagMode
import kamon.metric.MeasurementUnit.{time, information}
import kamon.trace.{IdentityProvider, Span}
@@ -259,7 +258,7 @@ object HttpServer {
override def receive(request: HttpRequest): RequestHandler = {
val incomingContext = if(settings.enableContextPropagation)
- _propagation.readContext(request)
+ _propagation.read(request)
else Context.Empty
val requestSpan = if(settings.enableTracing)
@@ -296,7 +295,7 @@ object HttpServer {
}
if(settings.enableContextPropagation) {
- _propagation.writeContext(context, response, Direction.Returning)
+ _propagation.write(context, response)
}
_metrics.foreach { httpServerMetrics =>
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
index 7439520c..63f8e1b0 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -19,35 +19,36 @@ import java.net.{URLDecoder, URLEncoder}
import java.nio.ByteBuffer
import kamon.Kamon
-import kamon.context.{Codecs, Context, HttpPropagation, TextMap}
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context._
import kamon.context.generated.binary.span.{Span => ColferSpan}
-import kamon.context.HttpPropagation.Direction
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
import kamon.trace.SpanContext.SamplingDecision
object SpanCodec {
- class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter {
+ class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] {
import B3.Headers
- override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
val identityProvider = Kamon.tracer.identityProvider
- val traceID = reader.readHeader(Headers.TraceIdentifier)
+ val traceID = reader.read(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val spanID = reader.readHeader(Headers.SpanIdentifier)
+ val spanID = reader.read(Headers.SpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
- val parentID = reader.readHeader(Headers.ParentSpanIdentifier)
+ val parentID = reader.read(Headers.ParentSpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val flags = reader.readHeader(Headers.Flags)
+ val flags = reader.read(Headers.Flags)
- val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match {
+ val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match {
case Some(sampled) if sampled == "1" => SamplingDecision.Sample
case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
@@ -59,19 +60,19 @@ object SpanCodec {
}
- override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
+ override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = {
val span = context.get(Span.ContextKey)
if(span.nonEmpty()) {
val spanContext = span.context()
- writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
- writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+ writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
if(spanContext.parentID != IdentityProvider.NoIdentifier)
- writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+ writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
- writer.writeHeader(Headers.Sampled, samplingDecision)
+ writer.write(Headers.Sampled, samplingDecision)
}
}
}
@@ -101,38 +102,16 @@ object SpanCodec {
}
- class Colfer extends Codecs.ForEntry[ByteBuffer] {
+ class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] {
val emptyBuffer = ByteBuffer.allocate(0)
- override def encode(context: Context): ByteBuffer = {
- val span = context.get(Span.ContextKey)
- if(span.nonEmpty()) {
- val marshalBuffer = Colfer.codecBuffer.get()
- val colferSpan = new ColferSpan()
- val spanContext = span.context()
-
- colferSpan.setTraceID(spanContext.traceID.bytes)
- colferSpan.setSpanID(spanContext.spanID.bytes)
- colferSpan.setParentID(spanContext.parentID.bytes)
- colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
-
- val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
- val buffer = ByteBuffer.allocate(marshalledSize)
- buffer.put(marshalBuffer, 0, marshalledSize)
- buffer
-
- } else emptyBuffer
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- carrier.clear()
-
- if(carrier.capacity() == 0)
+ override def read(medium: ByteStreamReader, context: Context): Context = {
+ if(medium.available() == 0)
context
else {
val identityProvider = Kamon.tracer.identityProvider
val colferSpan = new ColferSpan()
- colferSpan.unmarshal(carrier.array(), 0)
+ colferSpan.unmarshal(medium.readAll(), 0)
val spanContext = SpanContext(
traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID),
@@ -145,6 +124,24 @@ object SpanCodec {
}
}
+ override def write(context: Context, medium: ByteStreamWriter): Unit = {
+ val span = context.get(Span.ContextKey)
+
+ if(span.nonEmpty()) {
+ val marshalBuffer = Colfer.codecBuffer.get()
+ val colferSpan = new ColferSpan()
+ val spanContext = span.context()
+
+ colferSpan.setTraceID(spanContext.traceID.bytes)
+ colferSpan.setSpanID(spanContext.spanID.bytes)
+ colferSpan.setParentID(spanContext.parentID.bytes)
+ colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
+
+ val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
+ medium.write(marshalBuffer, 0, marshalledSize)
+
+ }
+ }
private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match {
case SamplingDecision.Sample => 1
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala b/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala
deleted file mode 100644
index c755b0af..00000000
--- a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.testkit
-
-import java.nio.ByteBuffer
-
-import kamon.context.{Codecs, Context, TextMap}
-
-object SimpleStringCodec {
- final class Headers extends Codecs.ForEntry[TextMap] {
- private val dataKey = "X-String-Value"
-
- override def encode(context: Context): TextMap = {
- val textMap = TextMap.Default()
- context.get(ContextTesting.StringBroadcastKey).foreach { value =>
- textMap.put(dataKey, value)
- }
-
- textMap
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
- carrier.get(dataKey) match {
- case value @ Some(_) => context.withKey(ContextTesting.StringBroadcastKey, value)
- case None => context
- }
- }
- }
-
- final class Binary extends Codecs.ForEntry[ByteBuffer] {
- val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0)
-
- override def encode(context: Context): ByteBuffer = {
- context.get(ContextTesting.StringBroadcastKey) match {
- case Some(value) => ByteBuffer.wrap(value.getBytes)
- case None => emptyBuffer
- }
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- context.withKey(ContextTesting.StringBroadcastKey, Some(new String(carrier.array())))
- }
- }
-}