aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--build.sbt4
-rw-r--r--kamon-core-tests/src/test/resources/reference.conf34
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala174
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala93
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala50
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala159
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala8
-rw-r--r--kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala316
-rw-r--r--kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala208
-rw-r--r--kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala232
-rw-r--r--kamon-core/src/main/colfer/Context.colf1
-rw-r--r--kamon-core/src/main/colfer/building.md2
-rw-r--r--kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java132
-rw-r--r--kamon-core/src/main/resources/reference.conf227
-rw-r--r--kamon-core/src/main/scala/kamon/ClassLoading.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/Configuration.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/ContextPropagation.scala92
-rw-r--r--kamon-core/src/main/scala/kamon/ContextStorage.scala49
-rw-r--r--kamon-core/src/main/scala/kamon/Environment.scala5
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala75
-rw-r--r--kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala301
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codecs.scala295
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala112
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala206
-rw-r--r--kamon-core/src/main/scala/kamon/context/Mixin.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/context/Propagation.scala81
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala66
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala453
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala36
-rw-r--r--kamon-core/src/main/scala/kamon/package.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala (renamed from kamon-core/src/main/scala/kamon/trace/SpanCodec.scala)128
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala18
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala11
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala57
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala17
-rw-r--r--version.sbt2
39 files changed, 2802 insertions, 990 deletions
diff --git a/.travis.yml b/.travis.yml
index 5f85bcd0..a564bb7f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,7 +2,7 @@ language: scala
script:
- sbt +test
scala:
- - 2.12.4
+ - 2.12.6
jdk:
- oraclejdk8
before_script:
diff --git a/build.sbt b/build.sbt
index 4790074f..131626d9 100644
--- a/build.sbt
+++ b/build.sbt
@@ -20,10 +20,10 @@ lazy val kamon = (project in file("."))
.aggregate(core, testkit, coreTests, coreBench)
val commonSettings = Seq(
- scalaVersion := "2.12.4",
+ scalaVersion := "2.12.6",
javacOptions += "-XDignore.symbol.file",
resolvers += Resolver.mavenLocal,
- crossScalaVersions := Seq("2.12.4", "2.11.8", "2.10.6"),
+ crossScalaVersions := Seq("2.12.6", "2.11.8", "2.10.6"),
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
scalacOptions ++= Seq(
"-deprecation",
diff --git a/kamon-core-tests/src/test/resources/reference.conf b/kamon-core-tests/src/test/resources/reference.conf
index 0d7ae9e2..c092af08 100644
--- a/kamon-core-tests/src/test/resources/reference.conf
+++ b/kamon-core-tests/src/test/resources/reference.conf
@@ -2,4 +2,38 @@ kamon {
context.codecs.string-keys {
request-id = "X-Request-ID"
}
+}
+
+
+
+kamon {
+
+ trace {
+ sampler = always
+ }
+
+ propagation.http.default {
+ tags.mappings {
+ "correlation-id" = "x-correlation-id"
+ }
+ }
+
+ instrumentation {
+ http-server {
+ default {
+ tracing.preferred-trace-id-tag = "correlation-id"
+ tracing.tags.from-context.peer = span
+ }
+
+ no-span-metrics {
+ tracing.span-metrics = off
+ }
+
+ noop {
+ propagation.enabled = no
+ metrics.enabled = no
+ tracing.enabled = no
+ }
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
new file mode 100644
index 00000000..5681d300
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
@@ -0,0 +1,174 @@
+package kamon.context
+
+import java.io.ByteArrayOutputStream
+
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context.Propagation.{EntryReader, EntryWriter}
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+
+import scala.util.Random
+
+class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {
+
+ "The Binary Context Propagation" should {
+ "return an empty context if there is no data to read from" in {
+ val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0)))
+ context.isEmpty() shouldBe true
+ }
+
+ "not write any data to the medium if the context is empty" in {
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(Context.Empty, writer)
+ writer.size() shouldBe 0
+ }
+
+ "handle malformed data in when reading a context" in {
+ val randomBytes = Array.ofDim[Byte](42)
+ Random.nextBytes(randomBytes)
+
+ val context = binaryPropagation.read(ByteStreamReader.of(randomBytes))
+ context.isEmpty() shouldBe true
+ }
+
+ "handle read failures in an entry reader" in {
+ val context = Context.of(
+ BinaryPropagationSpec.StringKey, "string-value",
+ BinaryPropagationSpec.FailStringKey, "fail-read"
+ )
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.tags shouldBe empty
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
+ }
+
+ "handle write failures in an entry writer" in {
+ val context = Context.of(
+ BinaryPropagationSpec.StringKey, "string-value",
+ BinaryPropagationSpec.FailStringKey, "fail-write"
+ )
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.tags shouldBe empty
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
+ }
+
+ "handle write failures in an entry writer when the context is too big" in {
+ val context = Context.of(BinaryPropagationSpec.StringKey, "string-value" * 20)
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext shouldBe empty
+ }
+
+ "round trip a Context that only has tags" in {
+ val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.entries shouldBe empty
+ rtContext.tags should contain theSameElementsAs (context.tags)
+ }
+
+ "round trip a Context that only has entries" in {
+ val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42)
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.tags shouldBe empty
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
+ }
+
+ "round trip a Context that with tags and entries" in {
+ val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
+ .withKey(BinaryPropagationSpec.StringKey, "string-value")
+ .withKey(BinaryPropagationSpec.IntegerKey, 42)
+
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+
+ rtContext.tags should contain theSameElementsAs (context.tags)
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
+ }
+ }
+
+ val binaryPropagation = BinaryPropagation.from(
+ ConfigFactory.parseString(
+ """
+ |max-outgoing-size = 64
+ |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
+ |entries.incoming.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
+ |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
+ |entries.outgoing.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
+ |
+ """.stripMargin
+ ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon)
+
+
+ def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter
+
+}
+
+object BinaryPropagationSpec {
+
+ val StringKey = Context.key[String]("string", null)
+ val FailStringKey = Context.key[String]("failString", null)
+ val IntegerKey = Context.key[Int]("integer", 0)
+
+ class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {
+
+ override def read(medium: ByteStreamReader, context: Context): Context = {
+ val valueData = medium.readAll()
+
+ if(valueData.length > 0) {
+ context.withKey(StringKey, new String(valueData))
+ } else context
+ }
+
+ override def write(context: Context, medium: ByteStreamWriter): Unit = {
+ val value = context.get(StringKey)
+ if(value != null) {
+ medium.write(value.getBytes)
+ }
+ }
+ }
+
+ class FailStringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {
+
+ override def read(medium: ByteStreamReader, context: Context): Context = {
+ val valueData = medium.readAll()
+
+ if(valueData.length > 0) {
+ val stringValue = new String(valueData)
+ if(stringValue == "fail-read") {
+ sys.error("The fail string entry reader has triggered")
+ }
+
+ context.withKey(FailStringKey, stringValue)
+ } else context
+ }
+
+ override def write(context: Context, medium: ByteStreamWriter): Unit = {
+ val value = context.get(FailStringKey)
+ if(value != null && value != "fail-write") {
+ medium.write(value.getBytes)
+ } else {
+ medium.write(42) // malformed data on purpose
+ sys.error("The fail string entry writer has triggered")
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala
deleted file mode 100644
index f7bd7e56..00000000
--- a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.context
-
-import kamon.Kamon
-import kamon.testkit.ContextTesting
-import org.scalatest.{Matchers, OptionValues, WordSpec}
-
-class ContextCodecSpec extends WordSpec with Matchers with ContextTesting with OptionValues {
- "the Context Codec" when {
- "encoding/decoding to HttpHeaders" should {
- "round trip a empty context" in {
- val textMap = ContextCodec.HttpHeaders.encode(Context.Empty)
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with only local keys" in {
- val localOnlyContext = Context.create(StringKey, Some("string-value"))
- val textMap = ContextCodec.HttpHeaders.encode(localOnlyContext)
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with local and broadcast keys" in {
- val initialContext = Context.create()
- .withKey(StringKey, Some("string-value"))
- .withKey(StringBroadcastKey, Some("this-should-be-round-tripped"))
-
- val textMap = ContextCodec.HttpHeaders.encode(initialContext)
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- decodedContext.get(StringKey) shouldBe empty
- decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped"
- }
-
- "read string broadcast keys using the configured header name" in {
- val textMap = TextMap.Default()
- textMap.put("X-Request-ID", "123456")
- val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
-
- decodedContext.get(Key.broadcastString("request-id")).value shouldBe "123456"
- }
- }
-
- "encoding/decoding to Binary" should {
- "round trip a empty context" in {
- val byteBuffer = ContextCodec.Binary.encode(Context.Empty)
-
- val decodedContext = ContextCodec.Binary.decode(byteBuffer)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with only local keys" in {
- val localOnlyContext = Context.create(StringKey, Some("string-value"))
- val byteBuffer = ContextCodec.Binary.encode(localOnlyContext)
- val decodedContext = ContextCodec.Binary.decode(byteBuffer)
-
- decodedContext shouldBe Context.Empty
- }
-
- "round trip a context with local and broadcast keys" in {
- val initialContext = Context.create()
- .withKey(StringKey, Some("string-value"))
- .withKey(StringBroadcastKey, Some("this-should-be-round-tripped"))
-
- val byteBuffer = ContextCodec.Binary.encode(initialContext)
- val decodedContext = ContextCodec.Binary.decode(byteBuffer)
-
- decodedContext.get(StringKey) shouldBe empty
- decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped"
- }
- }
- }
-
- val ContextCodec = new Codecs(Kamon.config())
-} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala
deleted file mode 100644
index f7e7599f..00000000
--- a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.context
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
-
-import kamon.Kamon
-import kamon.testkit.ContextTesting
-import org.scalatest.{Matchers, OptionValues, WordSpec}
-
-class ContextSerializationSpec extends WordSpec with Matchers with ContextTesting with OptionValues {
- "the Context is Serializable" should {
- "empty " in {
- val bos = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bos)
- oos.writeObject(Context.Empty)
-
- val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray))
- val ctx = ois.readObject().asInstanceOf[Context]
- ctx shouldBe Context.Empty
- }
-
- "full" in {
- val sCtx = Context(StringBroadcastKey, Some("disi"))
- val bos = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bos)
- oos.writeObject(sCtx)
-
- val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray))
- val rCtx = ois.readObject().asInstanceOf[Context]
- rCtx shouldBe sCtx
- }
-
- }
-
- val ContextCodec = new Codecs(Kamon.config())
-} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
new file mode 100644
index 00000000..fcddfe24
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
@@ -0,0 +1,159 @@
+package kamon.context
+
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+import kamon.context.Propagation.{EntryReader, EntryWriter}
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+
+import scala.collection.mutable
+
+class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
+
+ "The HTTP Context Propagation" when {
+ "reading from incoming requests" should {
+ "return an empty context if there are no tags nor keys" in {
+ val context = httpPropagation.read(headerReaderFromMap(Map.empty))
+ context.isEmpty() shouldBe true
+ }
+
+ "read tags from an HTTP message when they are available" in {
+ val headers = Map(
+ "x-content-tags" -> "hello=world;correlation=1234",
+ "x-mapped-tag" -> "value"
+ )
+ val context = httpPropagation.read(headerReaderFromMap(headers))
+ context.tags should contain only(
+ "hello" -> "world",
+ "correlation" -> "1234",
+ "mappedTag" -> "value"
+ )
+ }
+
+ "handle errors when reading HTTP headers" in {
+ val headers = Map("fail" -> "")
+ val context = httpPropagation.read(headerReaderFromMap(headers))
+ context.tags shouldBe empty
+ context.entries shouldBe empty
+ }
+
+ "read context with entries and tags" in {
+ val headers = Map(
+ "x-content-tags" -> "hello=world;correlation=1234",
+ "string-header" -> "hey",
+ "integer-header" -> "123"
+ )
+
+ val context = httpPropagation.read(headerReaderFromMap(headers))
+ context.get(HttpPropagationSpec.StringKey) shouldBe "hey"
+ context.get(HttpPropagationSpec.IntegerKey) shouldBe 123
+ context.get(HttpPropagationSpec.OptionalKey) shouldBe empty
+ context.getTag("hello").value shouldBe "world"
+ context.getTag("correlation").value shouldBe "1234"
+ context.getTag("unknown") shouldBe empty
+ }
+ }
+
+
+ "writing to outgoing requests" should {
+ "not write anything if the context is empty" in {
+ val headers = mutable.Map.empty[String, String]
+ httpPropagation.write(Context.Empty, headerWriterFromMap(headers))
+ headers shouldBe empty
+ }
+
+ "write context tags when available" in {
+ val headers = mutable.Map.empty[String, String]
+ val context = Context.of(Map(
+ "hello" -> "world",
+ "mappedTag" -> "value"
+ ))
+
+ httpPropagation.write(context, headerWriterFromMap(headers))
+ headers should contain only(
+ "x-content-tags" -> "hello=world;",
+ "x-mapped-tag" -> "value"
+ )
+ }
+
+ "write context entries when available" in {
+ val headers = mutable.Map.empty[String, String]
+ val context = Context.of(
+ HttpPropagationSpec.StringKey, "out-we-go",
+ HttpPropagationSpec.IntegerKey, 42
+ )
+
+ httpPropagation.write(context, headerWriterFromMap(headers))
+ headers should contain only(
+ "string-header" -> "out-we-go"
+ )
+ }
+ }
+ }
+
+
+ val httpPropagation = HttpPropagation.from(
+ ConfigFactory.parseString(
+ """
+ |tags {
+ | header-name = "x-content-tags"
+ |
+ | mappings {
+ | mappedTag = "x-mapped-tag"
+ | }
+ |}
+ |
+ |entries.incoming.string = "kamon.context.HttpPropagationSpec$StringEntryCodec"
+ |entries.incoming.integer = "kamon.context.HttpPropagationSpec$IntegerEntryCodec"
+ |entries.outgoing.string = "kamon.context.HttpPropagationSpec$StringEntryCodec"
+ |
+ """.stripMargin
+ ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon)
+
+
+ def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader {
+ override def read(header: String): Option[String] = {
+ if(map.get("fail").nonEmpty)
+ sys.error("failing on purpose")
+
+ map.get(header)
+ }
+
+ override def readAll(): Map[String, String] = map
+ }
+
+ def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter {
+ override def write(header: String, value: String): Unit = map.put(header, value)
+ }
+}
+
+object HttpPropagationSpec {
+
+ val StringKey = Context.key[String]("string", null)
+ val IntegerKey = Context.key[Int]("integer", 0)
+ val OptionalKey = Context.key[Option[String]]("optional", None)
+
+
+ class StringEntryCodec extends EntryReader[HeaderReader] with EntryWriter[HeaderWriter] {
+ private val HeaderName = "string-header"
+
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ reader.read(HeaderName)
+ .map(v => context.withKey(StringKey, v))
+ .getOrElse(context)
+ }
+
+ override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = {
+ Option(context.get(StringKey)).foreach(v => writer.write(HeaderName, v))
+ }
+ }
+
+ class IntegerEntryCodec extends EntryReader[HeaderReader] {
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ reader.read("integer-header")
+ .map(v => context.withKey(IntegerKey, v.toInt))
+ .getOrElse(context)
+
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala
index d2c4e57e..8b94ac0c 100644
--- a/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala
@@ -48,8 +48,8 @@ class ThreadLocalStorageSpec extends WordSpec with Matchers {
}
val TLS: Storage = new Storage.ThreadLocal
- val TestKey = Key.local("test-key", 42)
- val AnotherKey = Key.local("another-key", 99)
- val BroadcastKey = Key.broadcast("broadcast", "i travel around")
- val ScopeWithKey = Context.create().withKey(TestKey, 43)
+ val TestKey = Context.key("test-key", 42)
+ val AnotherKey = Context.key("another-key", 99)
+ val BroadcastKey = Context.key("broadcast", "i travel around")
+ val ScopeWithKey = Context.of(TestKey, 43)
}
diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
new file mode 100644
index 00000000..c3c5f131
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
@@ -0,0 +1,316 @@
+package kamon.instrumentation
+
+import java.time.Duration
+
+import kamon.context.Context
+import kamon.metric.{Counter, Histogram, RangeSampler}
+import kamon.testkit.{MetricInspection, SpanInspection}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+
+import scala.collection.mutable
+
+class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues with MetricInspection with Eventually {
+
+ "the HTTP server instrumentation" when {
+ "configured for context propagation" should {
+ "read context entries and tags from the incoming request" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.context.tags should contain only(
+ "tag" -> "value",
+ "none" -> "0011223344556677"
+ )
+
+ handler.send(fakeResponse(200, mutable.Map.empty), Context.Empty)
+ handler.doneSending(0L)
+ }
+
+ "use the configured HTTP propagation channel" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.context.tags should contain only(
+ "tag" -> "value",
+ "none" -> "0011223344556677"
+ )
+
+ val span = inspect(handler.span)
+ span.context().traceID.string shouldNot be("0011223344556677")
+ span.tag("http.method").value shouldBe "GET"
+ span.tag("http.url").value shouldBe "http://localhost:8080/"
+
+ val responseHeaders = mutable.Map.empty[String, String]
+ handler.send(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world"))
+ handler.doneSending(0L)
+ }
+ }
+
+ "configured for HTTP server metrics" should {
+ "track the number of open connections" in {
+ openConnections(8081).distribution()
+
+ httpServer().openConnection()
+ httpServer().openConnection()
+
+ val snapshotWithOpenConnections = openConnections(8081).distribution()
+ snapshotWithOpenConnections.min shouldBe 0
+ snapshotWithOpenConnections.max shouldBe 2
+
+ httpServer().closeConnection(Duration.ofSeconds(20), 10)
+ httpServer().closeConnection(Duration.ofSeconds(30), 15)
+
+ eventually {
+ val snapshotWithoutOpenConnections = openConnections(8081).distribution()
+ snapshotWithoutOpenConnections.min shouldBe 0
+ snapshotWithoutOpenConnections.max shouldBe 0
+ }
+ }
+
+ "track the distribution of number of requests handled per each connection" in {
+ connectionUsage(8081).distribution()
+
+ httpServer().openConnection()
+ httpServer().openConnection()
+ httpServer().closeConnection(Duration.ofSeconds(20), 10)
+ httpServer().closeConnection(Duration.ofSeconds(30), 15)
+
+ val connectionUsageSnapshot = connectionUsage(8081).distribution()
+ connectionUsageSnapshot.buckets.map(_.value) should contain allOf(
+ 10,
+ 15
+ )
+ }
+
+ "track the distribution of connection lifetime across all connections" in {
+ connectionLifetime(8081).distribution()
+
+ httpServer().openConnection()
+ httpServer().openConnection()
+ httpServer().closeConnection(Duration.ofSeconds(20), 10)
+ httpServer().closeConnection(Duration.ofSeconds(30), 15)
+
+ val connectionLifetimeSnapshot = connectionLifetime(8081).distribution()
+ connectionLifetimeSnapshot.buckets.map(_.value) should contain allOf(
+ 19998441472L, // 20 seconds with 1% precision
+ 29930553344L // 30 seconds with 1% precision
+ )
+ }
+
+ "track the number of active requests" in {
+ activeRequests(8081).distribution()
+
+ val handlerOne = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+ val handlerTwo = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+
+ val snapshotWithActiveRequests = activeRequests(8081).distribution()
+ snapshotWithActiveRequests.min shouldBe 0
+ snapshotWithActiveRequests.max shouldBe 2
+
+ handlerOne.send(fakeResponse(200, mutable.Map.empty), Context.Empty)
+ handlerTwo.send(fakeResponse(200, mutable.Map.empty), Context.Empty)
+ handlerOne.doneSending(0L)
+ handlerTwo.doneSending(0L)
+
+ eventually {
+ val snapshotWithoutActiveRequests = activeRequests(8081).distribution()
+ snapshotWithoutActiveRequests.min shouldBe 0
+ snapshotWithoutActiveRequests.max shouldBe 0
+ }
+ }
+
+ "track the distribution of sizes on incoming requests" in {
+ requestSize(8081).distribution()
+
+ httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneReceiving(300)
+ httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneReceiving(400)
+
+ val requestSizeSnapshot = requestSize(8081).distribution()
+ requestSizeSnapshot.buckets.map(_.value) should contain allOf(
+ 300,
+ 400
+ )
+ }
+
+ "track the distribution of sizes on outgoing responses" in {
+ responseSize(8081).distribution()
+
+ httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneSending(300)
+ httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneSending(400)
+
+ val requestSizeSnapshot = responseSize(8081).distribution()
+ requestSizeSnapshot.buckets.map(_.value) should contain allOf(
+ 300,
+ 400
+ )
+ }
+
+ "track the number of responses per status code" in {
+ // resets all counters
+ completedRequests(8081, 100).value()
+ completedRequests(8081, 200).value()
+ completedRequests(8081, 300).value()
+ completedRequests(8081, 400).value()
+ completedRequests(8081, 500).value()
+
+
+ val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)
+ httpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty)
+ httpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty)
+ httpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty)
+ httpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty)
+ httpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty)
+
+ completedRequests(8081, 100).value() shouldBe 1L
+ completedRequests(8081, 200).value() shouldBe 1L
+ completedRequests(8081, 300).value() shouldBe 1L
+ completedRequests(8081, 400).value() shouldBe 1L
+ completedRequests(8081, 500).value() shouldBe 1L
+ }
+ }
+
+ "configured for distributed tracing" should {
+ "create a span representing the current HTTP operation" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+ handler.send(fakeResponse(200, mutable.Map.empty), handler.context)
+
+ val span = inspect(handler.span)
+ span.tag("http.method").value shouldBe "GET"
+ span.tag("http.url").value shouldBe "http://localhost:8080/"
+ span.tag("http.status_code").value shouldBe "200"
+ }
+
+ "adopt a traceID when explicitly provided" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "x-correlation-id" -> "0011223344556677"
+ )))
+
+ handler.span.context().traceID.string shouldBe "0011223344556677"
+ }
+
+ "record span metrics when enabled" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+ handler.send(fakeResponse(200, mutable.Map.empty), handler.context)
+
+ val span = inspect(handler.span)
+ span.hasMetricsEnabled() shouldBe true
+ }
+
+ "not record span metrics when disabled" in {
+ val handler = noSpanMetricsHttpServer()
+ .receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+ handler.send(fakeResponse(200, mutable.Map.empty), handler.context)
+
+ val span = inspect(handler.span)
+ span.hasMetricsEnabled() shouldBe false
+ }
+
+ "receive tags from context when available" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;peer=superservice;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ val span = inspect(handler.span)
+ span.tag("peer").value shouldBe "superservice"
+ }
+ }
+
+ "all capabilities are disabled" should {
+ "not read any context from the incoming requests" in {
+ val httpServer = noopHttpServer()
+ val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.context shouldBe Context.Empty
+ }
+
+ "not create any span to represent the server request" in {
+ val httpServer = noopHttpServer()
+ val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.span.isEmpty() shouldBe true
+ }
+
+ "not record any HTTP server metrics" in {
+ val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)
+ noopHttpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty)
+
+ completedRequests(8083, 100).value() shouldBe 0L
+ completedRequests(8083, 200).value() shouldBe 0L
+ completedRequests(8083, 300).value() shouldBe 0L
+ completedRequests(8083, 400).value() shouldBe 0L
+ completedRequests(8083, 500).value() shouldBe 0L
+ }
+ }
+ }
+
+ val TestComponent = "http.server"
+ val TestInterface = "0.0.0.0"
+
+ def httpServer(): HttpServer = HttpServer.from("default", component = TestComponent, interface = TestInterface, port = 8081)
+ def noSpanMetricsHttpServer(): HttpServer = HttpServer.from("no-span-metrics", component = TestComponent, interface = TestInterface, port = 8082)
+ def noopHttpServer(): HttpServer = HttpServer.from("noop", component = TestComponent, interface = TestInterface, port = 8083)
+
+ def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpMessage.Request =
+ new HttpMessage.Request {
+ override def url: String = requestUrl
+ override def path: String = requestPath
+ override def method: String = requestMethod
+ override def read(header: String): Option[String] = headers.get(header)
+ override def readAll(): Map[String, String] = headers
+ }
+
+ def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpMessage.ResponseBuilder[HttpMessage.Response] =
+ new HttpMessage.ResponseBuilder[HttpMessage.Response] {
+ override def statusCode: Int = responseStatusCode
+ override def write(header: String, value: String): Unit = headers.put(header, value)
+ override def build(): HttpMessage.Response = this
+ }
+
+ def completedRequests(port: Int, statusCode: Int): Counter = {
+ val metrics = HttpServer.Metrics.of(TestComponent, TestInterface, port)
+
+ statusCode match {
+ case sc if sc >= 100 && sc <= 199 => metrics.requestsInformational
+ case sc if sc >= 200 && sc <= 299 => metrics.requestsSuccessful
+ case sc if sc >= 300 && sc <= 399 => metrics.requestsRedirection
+ case sc if sc >= 400 && sc <= 499 => metrics.requestsClientError
+ case sc if sc >= 500 && sc <= 599 => metrics.requestsServerError
+ }
+ }
+
+ def openConnections(port: Int): RangeSampler =
+ HttpServer.Metrics.of(TestComponent, TestInterface, port).openConnections
+
+ def connectionUsage(port: Int): Histogram =
+ HttpServer.Metrics.of(TestComponent, TestInterface, port).connectionUsage
+
+ def connectionLifetime(port: Int): Histogram =
+ HttpServer.Metrics.of(TestComponent, TestInterface, port).connectionLifetime
+
+ def activeRequests(port: Int): RangeSampler =
+ HttpServer.Metrics.of(TestComponent, TestInterface, port).activeRequests
+
+ def requestSize(port: Int): Histogram =
+ HttpServer.Metrics.of(TestComponent, TestInterface, port).requestSize
+
+ def responseSize(port: Int): Histogram =
+ HttpServer.Metrics.of(TestComponent, TestInterface, port).responseSize
+
+}
diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala
deleted file mode 100644
index f718d806..00000000
--- a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.trace
-
-import kamon.context.{Context, TextMap}
-import kamon.testkit.SpanBuilding
-import kamon.trace.IdentityProvider.Identifier
-import kamon.trace.SpanContext.SamplingDecision
-import org.scalatest.{Matchers, OptionValues, WordSpecLike}
-
-
-class B3SpanCodecSpec extends WordSpecLike with Matchers with OptionValues with SpanBuilding {
- val extendedB3Codec = SpanCodec.B3()
-
- "The ExtendedB3 SpanContextCodec" should {
- "return a TextMap containing the SpanContext data" in {
- val textMap = extendedB3Codec.encode(testContext())
- textMap.get("X-B3-TraceId").value shouldBe "1234"
- textMap.get("X-B3-ParentSpanId").value shouldBe "2222"
- textMap.get("X-B3-SpanId").value shouldBe "4321"
- textMap.get("X-B3-Sampled").value shouldBe "1"
- }
-
- "do not include the X-B3-ParentSpanId if there is no parent" in {
- val textMap = extendedB3Codec.encode(testContextWithoutParent())
- textMap.get("X-B3-TraceId").value shouldBe "1234"
- textMap.get("X-B3-ParentSpanId") shouldBe empty
- textMap.get("X-B3-SpanId").value shouldBe "4321"
- textMap.get("X-B3-Sampled").value shouldBe "1"
- }
-
-
- "not inject anything if there is no Span in the Context" in {
- val textMap = extendedB3Codec.encode(Context.Empty)
- textMap.values shouldBe empty
- }
-
- "extract a RemoteSpan from a TextMap when all fields are set" in {
- val textMap = TextMap.Default()
- textMap.put("X-B3-TraceId", "1234")
- textMap.put("X-B3-ParentSpanId", "2222")
- textMap.put("X-B3-SpanId", "4321")
- textMap.put("X-B3-Sampled", "1")
- textMap.put("X-B3-Extra-Baggage", "some=baggage;more=baggage")
-
- val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context()
- spanContext.traceID.string shouldBe "1234"
- spanContext.spanID.string shouldBe "4321"
- spanContext.parentID.string shouldBe "2222"
- spanContext.samplingDecision shouldBe SamplingDecision.Sample
- }
-
- "decode the sampling decision based on the X-B3-Sampled header" in {
- val sampledTextMap = TextMap.Default()
- sampledTextMap.put("X-B3-TraceId", "1234")
- sampledTextMap.put("X-B3-SpanId", "4321")
- sampledTextMap.put("X-B3-Sampled", "1")
-
- val notSampledTextMap = TextMap.Default()
- notSampledTextMap.put("X-B3-TraceId", "1234")
- notSampledTextMap.put("X-B3-SpanId", "4321")
- notSampledTextMap.put("X-B3-Sampled", "0")
-
- val noSamplingTextMap = TextMap.Default()
- noSamplingTextMap.put("X-B3-TraceId", "1234")
- noSamplingTextMap.put("X-B3-SpanId", "4321")
-
- extendedB3Codec.decode(sampledTextMap, Context.Empty)
- .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Sample
-
- extendedB3Codec.decode(notSampledTextMap, Context.Empty)
- .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.DoNotSample
-
- extendedB3Codec.decode(noSamplingTextMap, Context.Empty)
- .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Unknown
- }
-
- "not include the X-B3-Sampled header if the sampling decision is unknown" in {
- val context = testContext()
- val sampledSpanContext = context.get(Span.ContextKey).context()
- val notSampledSpanContext = Context.Empty.withKey(Span.ContextKey,
- Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.DoNotSample)))
- val unknownSamplingSpanContext = Context.Empty.withKey(Span.ContextKey,
- Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.Unknown)))
-
- extendedB3Codec.encode(context).get("X-B3-Sampled").value shouldBe("1")
- extendedB3Codec.encode(notSampledSpanContext).get("X-B3-Sampled").value shouldBe("0")
- extendedB3Codec.encode(unknownSamplingSpanContext).get("X-B3-Sampled") shouldBe empty
- }
-
- "use the Debug flag to override the sampling decision, if provided." in {
- val textMap = TextMap.Default()
- textMap.put("X-B3-TraceId", "1234")
- textMap.put("X-B3-SpanId", "4321")
- textMap.put("X-B3-Sampled", "0")
- textMap.put("X-B3-Flags", "1")
-
- val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context()
- spanContext.samplingDecision shouldBe SamplingDecision.Sample
- }
-
- "use the Debug flag as sampling decision when Sampled is not provided" in {
- val textMap = TextMap.Default()
- textMap.put("X-B3-TraceId", "1234")
- textMap.put("X-B3-SpanId", "4321")
- textMap.put("X-B3-Flags", "1")
-
- val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context()
- spanContext.samplingDecision shouldBe SamplingDecision.Sample
- }
-
- "extract a minimal SpanContext from a TextMap containing only the Trace ID and Span ID" in {
- val textMap = TextMap.Default()
- textMap.put("X-B3-TraceId", "1234")
- textMap.put("X-B3-SpanId", "4321")
-
- val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context()
- spanContext.traceID.string shouldBe "1234"
- spanContext.spanID.string shouldBe "4321"
- spanContext.parentID shouldBe IdentityProvider.NoIdentifier
- spanContext.samplingDecision shouldBe SamplingDecision.Unknown
- }
-
- "do not extract a SpanContext if Trace ID and Span ID are not provided" in {
- val onlyTraceID = TextMap.Default()
- onlyTraceID.put("X-B3-TraceId", "1234")
- onlyTraceID.put("X-B3-Sampled", "0")
- onlyTraceID.put("X-B3-Flags", "1")
-
- val onlySpanID = TextMap.Default()
- onlySpanID.put("X-B3-SpanId", "4321")
- onlySpanID.put("X-B3-Sampled", "0")
- onlySpanID.put("X-B3-Flags", "1")
-
- val noIds = TextMap.Default()
- noIds.put("X-B3-Sampled", "0")
- noIds.put("X-B3-Flags", "1")
-
- extendedB3Codec.decode(onlyTraceID, Context.Empty).get(Span.ContextKey) shouldBe Span.Empty
- extendedB3Codec.decode(onlySpanID, Context.Empty).get(Span.ContextKey) shouldBe Span.Empty
- extendedB3Codec.decode(noIds, Context.Empty).get(Span.ContextKey) shouldBe Span.Empty
- }
-
- "round trip a Span from TextMap -> Context -> TextMap" in {
- val textMap = TextMap.Default()
- textMap.put("X-B3-TraceId", "1234")
- textMap.put("X-B3-ParentSpanId", "2222")
- textMap.put("X-B3-SpanId", "4321")
- textMap.put("X-B3-Sampled", "1")
-
- val context = extendedB3Codec.decode(textMap, Context.Empty)
- val injectTextMap = extendedB3Codec.encode(context)
-
- textMap.values.toSeq should contain theSameElementsAs(injectTextMap.values.toSeq)
- }
-
- /*
- // TODO: Should we be supporting this use case? maybe even have the concept of Debug requests ourselves?
- "internally carry the X-B3-Flags value so that it can be injected in outgoing requests" in {
- val textMap = TextMap.Default()
- textMap.put("X-B3-TraceId", "1234")
- textMap.put("X-B3-ParentSpanId", "2222")
- textMap.put("X-B3-SpanId", "4321")
- textMap.put("X-B3-Sampled", "1")
- textMap.put("X-B3-Flags", "1")
-
- val spanContext = extendedB3Codec.extract(textMap).value
- val injectTextMap = extendedB3Codec.inject(spanContext)
-
- injectTextMap.get("X-B3-Flags").value shouldBe("1")
- }*/
- }
-
- def testContext(): Context = {
- val spanContext = createSpanContext().copy(
- traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)),
- spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)),
- parentID = Identifier("2222", Array[Byte](2, 2, 2, 2))
- )
-
- Context.create().withKey(Span.ContextKey, Span.Remote(spanContext))
- }
-
- def testContextWithoutParent(): Context = {
- val spanContext = createSpanContext().copy(
- traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)),
- spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)),
- parentID = IdentityProvider.NoIdentifier
- )
-
- Context.create().withKey(Span.ContextKey, Span.Remote(spanContext))
- }
-
-} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala
new file mode 100644
index 00000000..5b912b92
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala
@@ -0,0 +1,232 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import kamon.context.{Context, HttpPropagation}
+import kamon.testkit.SpanBuilding
+import kamon.trace.IdentityProvider.Identifier
+import kamon.trace.SpanContext.SamplingDecision
+import org.scalatest.{Matchers, OptionValues, WordSpecLike}
+
+import scala.collection.mutable
+
+
+class B3SpanPropagationSpec extends WordSpecLike with Matchers with OptionValues with SpanBuilding {
+ val b3Propagation = SpanPropagation.B3()
+
+ "The B3 Span propagation for HTTP" should {
+ "write the Span data into headers" in {
+ val headersMap = mutable.Map.empty[String, String]
+ b3Propagation.write(testContext(), headerWriterFromMap(headersMap))
+
+ headersMap.get("X-B3-TraceId").value shouldBe "1234"
+ headersMap.get("X-B3-ParentSpanId").value shouldBe "2222"
+ headersMap.get("X-B3-SpanId").value shouldBe "4321"
+ headersMap.get("X-B3-Sampled").value shouldBe "1"
+ }
+
+ "do not include the X-B3-ParentSpanId if there is no parent" in {
+ val headersMap = mutable.Map.empty[String, String]
+ b3Propagation.write(testContextWithoutParent(), headerWriterFromMap(headersMap))
+
+ headersMap.get("X-B3-TraceId").value shouldBe "1234"
+ headersMap.get("X-B3-ParentSpanId") shouldBe empty
+ headersMap.get("X-B3-SpanId").value shouldBe "4321"
+ headersMap.get("X-B3-Sampled").value shouldBe "1"
+ }
+
+ "not inject anything if there is no Span in the Context" in {
+ val headersMap = mutable.Map.empty[String, String]
+ b3Propagation.write(Context.Empty, headerWriterFromMap(headersMap))
+ headersMap.values shouldBe empty
+ }
+
+ "extract a RemoteSpan from incoming headers when all fields are set" in {
+ val headersMap = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-ParentSpanId" -> "2222",
+ "X-B3-SpanId" -> "4321",
+ "X-B3-Sampled" -> "1",
+ "X-B3-Extra-Baggage" -> "some=baggage;more=baggage"
+ )
+
+ val spanContext = b3Propagation.read(headerReaderFromMap(headersMap), Context.Empty).get(Span.ContextKey).context()
+ spanContext.traceID.string shouldBe "1234"
+ spanContext.spanID.string shouldBe "4321"
+ spanContext.parentID.string shouldBe "2222"
+ spanContext.samplingDecision shouldBe SamplingDecision.Sample
+ }
+
+ "decode the sampling decision based on the X-B3-Sampled header" in {
+ val sampledHeaders = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-SpanId" -> "4321",
+ "X-B3-Sampled" -> "1"
+ )
+
+ val notSampledHeaders = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-SpanId" -> "4321",
+ "X-B3-Sampled" -> "0"
+ )
+
+ val noSamplingHeaders = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-SpanId" -> "4321"
+ )
+
+ b3Propagation.read(headerReaderFromMap(sampledHeaders), Context.Empty)
+ .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Sample
+
+ b3Propagation.read(headerReaderFromMap(notSampledHeaders), Context.Empty)
+ .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.DoNotSample
+
+ b3Propagation.read(headerReaderFromMap(noSamplingHeaders), Context.Empty)
+ .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Unknown
+ }
+
+ "not include the X-B3-Sampled header if the sampling decision is unknown" in {
+ val context = testContext()
+ val sampledSpanContext = context.get(Span.ContextKey).context()
+ val notSampledSpanContext = Context.Empty.withKey(Span.ContextKey,
+ Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.DoNotSample)))
+ val unknownSamplingSpanContext = Context.Empty.withKey(Span.ContextKey,
+ Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.Unknown)))
+ val headersMap = mutable.Map.empty[String, String]
+
+ b3Propagation.write(context, headerWriterFromMap(headersMap))
+ headersMap.get("X-B3-Sampled").value shouldBe("1")
+ headersMap.clear()
+
+ b3Propagation.write(notSampledSpanContext, headerWriterFromMap(headersMap))
+ headersMap.get("X-B3-Sampled").value shouldBe("0")
+ headersMap.clear()
+
+ b3Propagation.write(unknownSamplingSpanContext, headerWriterFromMap(headersMap))
+ headersMap.get("X-B3-Sampled") shouldBe empty
+ }
+
+ "use the Debug flag to override the sampling decision, if provided." in {
+ val headers = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-SpanId" -> "4321",
+ "X-B3-Sampled" -> "0",
+ "X-B3-Flags" -> "1"
+ )
+
+ val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context()
+ spanContext.samplingDecision shouldBe SamplingDecision.Sample
+ }
+
+ "use the Debug flag as sampling decision when Sampled is not provided" in {
+ val headers = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-SpanId" -> "4321",
+ "X-B3-Flags" -> "1"
+ )
+
+ val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context()
+ spanContext.samplingDecision shouldBe SamplingDecision.Sample
+ }
+
+ "extract a minimal SpanContext from a TextMap containing only the Trace ID and Span ID" in {
+ val headers = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-SpanId" -> "4321"
+ )
+
+ val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context()
+ spanContext.traceID.string shouldBe "1234"
+ spanContext.spanID.string shouldBe "4321"
+ spanContext.parentID shouldBe IdentityProvider.NoIdentifier
+ spanContext.samplingDecision shouldBe SamplingDecision.Unknown
+ }
+
+ "do not extract a SpanContext if Trace ID and Span ID are not provided" in {
+ val onlyTraceID = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-Sampled" -> "0",
+ "X-B3-Flags" -> "1"
+ )
+
+ val onlySpanID = Map(
+ "X-B3-SpanId" -> "1234",
+ "X-B3-Sampled" -> "0",
+ "X-B3-Flags" -> "1"
+ )
+
+ val noIds = Map(
+ "X-B3-Sampled" -> "0",
+ "X-B3-Flags" -> "1"
+ )
+
+ b3Propagation.read(headerReaderFromMap(onlyTraceID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty
+ b3Propagation.read(headerReaderFromMap(onlySpanID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty
+ b3Propagation.read(headerReaderFromMap(noIds), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty
+ }
+
+ "round trip a Span from TextMap -> Context -> TextMap" in {
+ val headers = Map(
+ "X-B3-TraceId" -> "1234",
+ "X-B3-SpanId" -> "4321",
+ "X-B3-ParentSpanId" -> "2222",
+ "X-B3-Sampled" -> "1"
+ )
+
+ val writenHeaders = mutable.Map.empty[String, String]
+ val context = b3Propagation.read(headerReaderFromMap(headers), Context.Empty)
+ b3Propagation.write(context, headerWriterFromMap(writenHeaders))
+ writenHeaders should contain theSameElementsAs(headers)
+ }
+ }
+
+ def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader {
+ override def read(header: String): Option[String] = {
+ if(map.get("fail").nonEmpty)
+ sys.error("failing on purpose")
+
+ map.get(header)
+ }
+
+ override def readAll(): Map[String, String] = map
+ }
+
+ def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter {
+ override def write(header: String, value: String): Unit = map.put(header, value)
+ }
+
+ def testContext(): Context = {
+ val spanContext = createSpanContext().copy(
+ traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)),
+ spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)),
+ parentID = Identifier("2222", Array[Byte](2, 2, 2, 2))
+ )
+
+ Context.of(Span.ContextKey, Span.Remote(spanContext))
+ }
+
+ def testContextWithoutParent(): Context = {
+ val spanContext = createSpanContext().copy(
+ traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)),
+ spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)),
+ parentID = IdentityProvider.NoIdentifier
+ )
+
+ Context.of(Span.ContextKey, Span.Remote(spanContext))
+ }
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/colfer/Context.colf b/kamon-core/src/main/colfer/Context.colf
index f84d7a56..f5d9a80b 100644
--- a/kamon-core/src/main/colfer/Context.colf
+++ b/kamon-core/src/main/colfer/Context.colf
@@ -6,5 +6,6 @@ type Entry struct {
}
type Context struct {
+ tags []text
entries []Entry
} \ No newline at end of file
diff --git a/kamon-core/src/main/colfer/building.md b/kamon-core/src/main/colfer/building.md
index f510a44f..8b663828 100644
--- a/kamon-core/src/main/colfer/building.md
+++ b/kamon-core/src/main/colfer/building.md
@@ -1,4 +1,4 @@
-Just download and install the colver compiler and run this command from the colfer folder:
+Just download and install the colfer compiler and run this command from the colfer folder:
```
colfer -b ../java -p kamon/context/generated/binary java
diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
index a9917e99..4be6d630 100644
--- a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
+++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
@@ -12,6 +12,7 @@ import java.io.ObjectOutputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
import java.util.InputMismatchException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
@@ -35,6 +36,8 @@ public class Context implements Serializable {
+ public String[] tags;
+
public Entry[] entries;
@@ -43,10 +46,12 @@ public class Context implements Serializable {
init();
}
+ private static final String[] _zeroTags = new String[0];
private static final Entry[] _zeroEntries = new Entry[0];
/** Colfer zero values. */
private void init() {
+ tags = _zeroTags;
entries = _zeroEntries;
}
@@ -142,6 +147,7 @@ public class Context implements Serializable {
/**
* Serializes the object.
+ * All {@code null} elements in {@link #tags} will be replaced with {@code ""}.
* All {@code null} elements in {@link #entries} will be replaced with a {@code new} value.
* @param out the data destination.
* @param buf the initial buffer or {@code null}.
@@ -171,6 +177,7 @@ public class Context implements Serializable {
/**
* Serializes the object.
+ * All {@code null} elements in {@link #tags} will be replaced with {@code ""}.
* All {@code null} elements in {@link #entries} will be replaced with a {@code new} value.
* @param buf the data destination.
* @param offset the initial index for {@code buf}, inclusive.
@@ -182,8 +189,72 @@ public class Context implements Serializable {
int i = offset;
try {
- if (this.entries.length != 0) {
+ if (this.tags.length != 0) {
buf[i++] = (byte) 0;
+ String[] a = this.tags;
+
+ int x = a.length;
+ if (x > Context.colferListMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", x, Context.colferListMax));
+ while (x > 0x7f) {
+ buf[i++] = (byte) (x | 0x80);
+ x >>>= 7;
+ }
+ buf[i++] = (byte) x;
+
+ for (int ai = 0; ai < a.length; ai++) {
+ String s = a[ai];
+ if (s == null) {
+ s = "";
+ a[ai] = s;
+ }
+
+ int start = ++i;
+
+ for (int sIndex = 0, sLength = s.length(); sIndex < sLength; sIndex++) {
+ char c = s.charAt(sIndex);
+ if (c < '\u0080') {
+ buf[i++] = (byte) c;
+ } else if (c < '\u0800') {
+ buf[i++] = (byte) (192 | c >>> 6);
+ buf[i++] = (byte) (128 | c & 63);
+ } else if (c < '\ud800' || c > '\udfff') {
+ buf[i++] = (byte) (224 | c >>> 12);
+ buf[i++] = (byte) (128 | c >>> 6 & 63);
+ buf[i++] = (byte) (128 | c & 63);
+ } else {
+ int cp = 0;
+ if (++sIndex < sLength) cp = Character.toCodePoint(c, s.charAt(sIndex));
+ if ((cp >= 1 << 16) && (cp < 1 << 21)) {
+ buf[i++] = (byte) (240 | cp >>> 18);
+ buf[i++] = (byte) (128 | cp >>> 12 & 63);
+ buf[i++] = (byte) (128 | cp >>> 6 & 63);
+ buf[i++] = (byte) (128 | cp & 63);
+ } else
+ buf[i++] = (byte) '?';
+ }
+ }
+ int size = i - start;
+ if (size > Context.colferSizeMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax));
+
+ int ii = start - 1;
+ if (size > 0x7f) {
+ i++;
+ for (int y = size; y >= 1 << 14; y >>>= 7) i++;
+ System.arraycopy(buf, start, buf, i - size, size);
+
+ do {
+ buf[ii++] = (byte) (size | 0x80);
+ size >>>= 7;
+ } while (size > 0x7f);
+ }
+ buf[ii] = (byte) size;
+ }
+ }
+
+ if (this.entries.length != 0) {
+ buf[i++] = (byte) 1;
Entry[] a = this.entries;
int x = a.length;
@@ -253,6 +324,35 @@ public class Context implements Serializable {
if (shift == 28 || b >= 0) break;
}
if (length < 0 || length > Context.colferListMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", length, Context.colferListMax));
+
+ String[] a = new String[length];
+ for (int ai = 0; ai < length; ai++) {
+ int size = 0;
+ for (int shift = 0; true; shift += 7) {
+ byte b = buf[i++];
+ size |= (b & 0x7f) << shift;
+ if (shift == 28 || b >= 0) break;
+ }
+ if (size < 0 || size > Context.colferSizeMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax));
+
+ int start = i;
+ i += size;
+ a[ai] = new String(buf, start, size, StandardCharsets.UTF_8);
+ }
+ this.tags = a;
+ header = buf[i++];
+ }
+
+ if (header == (byte) 1) {
+ int length = 0;
+ for (int shift = 0; true; shift += 7) {
+ byte b = buf[i++];
+ length |= (b & 0x7f) << shift;
+ if (shift == 28 || b >= 0) break;
+ }
+ if (length < 0 || length > Context.colferListMax)
throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", length, Context.colferListMax));
Entry[] a = new Entry[length];
@@ -278,7 +378,7 @@ public class Context implements Serializable {
}
// {@link Serializable} version number.
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
// {@link Serializable} Colfer extension.
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -312,6 +412,32 @@ public class Context implements Serializable {
}
/**
+ * Gets kamon/context/generated/binary/context.Context.tags.
+ * @return the value.
+ */
+ public String[] getTags() {
+ return this.tags;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/context.Context.tags.
+ * @param value the replacement.
+ */
+ public void setTags(String[] value) {
+ this.tags = value;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/context.Context.tags.
+ * @param value the replacement.
+ * @return {link this}.
+ */
+ public Context withTags(String[] value) {
+ this.tags = value;
+ return this;
+ }
+
+ /**
* Gets kamon/context/generated/binary/context.Context.entries.
* @return the value.
*/
@@ -340,6 +466,7 @@ public class Context implements Serializable {
@Override
public final int hashCode() {
int h = 1;
+ for (String o : this.tags) h = 31 * h + (o == null ? 0 : o.hashCode());
for (Entry o : this.entries) h = 31 * h + (o == null ? 0 : o.hashCode());
return h;
}
@@ -353,6 +480,7 @@ public class Context implements Serializable {
if (o == null) return false;
if (o == this) return true;
return o.getClass() == Context.class
+ && java.util.Arrays.equals(this.tags, o.tags)
&& java.util.Arrays.equals(this.entries, o.entries);
}
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 60fa156d..a35225d4 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -150,45 +150,210 @@ kamon {
}
}
+ propagation {
- context {
+ http {
- # Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or
- # Binary transports. Only broadcast keys configured bellow will be processed by the context Codec. The FQCN of
- # the appropriate Codecs for each key must be provided, otherwise keys will be ignored.
- #
- codecs {
-
- # Size of the encoding buffer for the Binary Codec.
- binary-buffer-size = 256
-
- # Declarative definition of broadcast context keys with type Option[String]. The setting key represents the actual
- # key name and the value is the HTTP header name to be used to encode/decode the context key. The key name will
- # be used when coding for binary transport. The most common use case for string keys is effortless propagation of
- # correlation keys or request related data (locale, user ID, etc). E.g. if wanting to propagate a "X-Request-ID"
- # header this config should suffice:
+ # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on
+ # this section for HTTP context propagation.
#
- # kamon.context.codecs.string-keys {
- # request-id = "X-Request-ID"
- # }
- #
- # If the application must read this context key they can define key with a matching name and read the value from
- # the context:
- # val requestIDKey = Key.broadcastString("request-id") // Do this only once, keep a reference.
- # val requestID = Kamon.currentContext().get(requestIDKey)
- #
- string-keys {
+ default {
+
+ # Configures how context tags will be propagated over HTTP headers.
+ #
+ tags {
+
+ # Header name used to encode context tags.
+ header-name = "context-tags"
+
+
+ # Provide explicit mappins between context tags and the HTTP headers that will carry them. When there is
+ # an explicit mapping for a tag, it will not be included in the default context header. For example, if
+ # you wanted to use the an HTTP header called `X-Correlation-ID` for a context tag with key `correlationID`
+ # you would need to include a the following configuration:
+ #
+ # mappings {
+ # correlationID = "X-Correlation-ID"
+ # }
+ #
+ # The correlationID tag would always be read and written from the `X-Correlation-ID` header. The context
+ # tag name is represented as the configuration key and the desired header name is represented by the
+ # cofiguration value.
+ #
+ mappings {
+
+ }
+ }
+ # Configure which entries should be read from incoming HTTP requests and writen to outgoing HTTP requests.
+ #
+ entries {
+
+ # Specify mappings between Context keys and the Propagation.EntryReader[HeaderReader] implementation in charge
+ # of reading them from the incoming HTTP request into the Context.
+ incoming {
+ span = "kamon.trace.SpanPropagation$B3"
+ }
+
+ # Specify mappings betwen Context keys and the Propagation.EntryWriter[HeaderWriter] implementation in charge
+ # of writing them to outgoing HTTP requests.
+ outgoing {
+ span = "kamon.trace.SpanPropagation$B3"
+ }
+ }
}
+ }
+
+ binary {
- # Codecs to be used when propagating a Context through a HTTP Headers transport.
- http-headers-keys {
- span = "kamon.trace.SpanCodec$B3"
+ # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on
+ # this section for HTTP context propagation.
+ #
+ default {
+
+ # Maximum outgoing Context size for binary transports. Contexts that surpass this limit will not be written to
+ # the outgoing medium.
+ max-outgoing-size = 2048
+
+ # Configure which entries should be read from incoming messages and writen to outgoing messages.
+ #
+ entries {
+
+ # Specify mappings between Context keys and the Propagation.EntryReader[ByteStreamReader] implementation in
+ # charge of reading them from the incoming messages into the Context.
+ incoming {
+ span = "kamon.trace.SpanPropagation$Colfer"
+ }
+
+ # Specify mappings betwen Context keys and the Propagation.EntryWriter[ByteStreamWriter] implementation in
+ # charge of writing them on the outgoing messages.
+ outgoing {
+ span = "kamon.trace.SpanPropagation$Colfer"
+ }
+ }
}
+ }
+ }
+
+ instrumentation {
+ http-server {
+ default {
+
+ #
+ # Configuration for HTTP context propagation.
+ #
+ propagation {
+
+ # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
+ # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
+ # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
+ enabled = yes
+
+ # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
+ # configuration for more details on how to configure the detault HTTP context propagation.
+ channel = "default"
+ }
- # Codecs to be used when propagating a Context through a Binary transport.
- binary-keys {
- span = "kamon.trace.SpanCodec$Colfer"
+
+ #
+ # Configuration for HTTP server metrics collection.
+ #
+ metrics {
+
+ # Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming
+ # that the instrumentation is fully compliant:
+ #
+ # - http.server.requets
+ # - http.server.request.active
+ # - http.server.request.size
+ # - http.server.response.size
+ # - http.server.connection.lifetime
+ # - http.server.connection.usage
+ # - http.server.connection.open
+ #
+ # All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests
+ # metric will also have a status_code tag with the status code group (1xx, 2xx and so on).
+ #
+ enabled = yes
+ }
+
+
+ #
+ # Configuration for HTTP request tracing.
+ #
+ tracing {
+
+ # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests
+ # and finish them when the response is sent back to the clients.
+ enabled = yes
+
+ # Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used
+ # only if all these conditions are met:
+ # - the context tag is present.
+ # - there is no parent Span on the incoming context (i.e. this is the first service on the trace).
+ # - the identifier is valid in accordance to the identity provider.
+ preferred-trace-id-tag = "none"
+
+ # Enables collection of span metrics using the `span.processing-time` metric.
+ span-metrics = on
+
+ # Select which tags should be included as span and span metric tags. The possible options are:
+ # - span: the tag is added as a Span tag (i.e. using span.tag(...))
+ # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
+ # - off: the tag is not used.
+ #
+ tags {
+
+ # Use the http.url tag.
+ url = span
+
+ # Use the http.method tag.
+ method = metric
+
+ # Use the http.status_code tag.
+ status-code = metric
+
+ # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type
+ # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration
+ # should be added:
+ #
+ # from-context {
+ # customer_type = span
+ # }
+ #
+ from-context {
+
+ }
+ }
+
+ # Custom mappings between routes and operation names.
+ operations {
+
+ # Operation name for Spans created on requests that could not be handled by any route in the current
+ # application.
+ unhandled = "unhandled"
+
+ # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
+ # instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
+ # For example, with the following configuration:
+ # mappings {
+ # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
+ # "/events/*/rsvps" = "EventRSVPs"
+ # }
+ #
+ # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
+ # the same operation name "/organization/:orgID/user/:userID/profile".
+ #
+ # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
+ # name "EventRSVPs".
+ #
+ # The patterns are expressed as globs and the operation names are free form.
+ #
+ mappings {
+
+ }
+ }
+ }
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/ClassLoading.scala b/kamon-core/src/main/scala/kamon/ClassLoading.scala
new file mode 100644
index 00000000..5b097af1
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ClassLoading.scala
@@ -0,0 +1,26 @@
+package kamon
+
+import kamon.util.DynamicAccess
+
+import scala.collection.immutable
+import scala.reflect.ClassTag
+import scala.util.Try
+
+/**
+ * Utilities for creating instances from fully qualified class names.
+ */
+trait ClassLoading {
+ @volatile private var _dynamicAccessClassLoader = this.getClass.getClassLoader
+ @volatile private var _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader)
+
+ def classLoader(): ClassLoader =
+ _dynamicAccessClassLoader
+
+ def changeClassLoader(classLoader: ClassLoader): Unit = synchronized {
+ _dynamicAccessClassLoader = classLoader
+ _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader)
+ }
+
+ def createInstance[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
+ _dynamicAccess.createInstanceFor(fqcn, args)
+}
diff --git a/kamon-core/src/main/scala/kamon/Configuration.scala b/kamon-core/src/main/scala/kamon/Configuration.scala
new file mode 100644
index 00000000..bd286792
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Configuration.scala
@@ -0,0 +1,57 @@
+package kamon
+
+import scala.util.Try
+import com.typesafe.config.{Config, ConfigFactory}
+import org.slf4j.LoggerFactory
+
+trait Configuration { self: ClassLoading =>
+ private val logger = LoggerFactory.getLogger(classOf[Configuration])
+ private var _currentConfig: Config = ConfigFactory.load(self.classLoader())
+ private var _onReconfigureHooks = Seq.empty[Configuration.OnReconfigureHook]
+
+
+ /**
+ * Retrieve Kamon's current configuration.
+ */
+ def config(): Config =
+ _currentConfig
+
+ /**
+ * Supply a new Config instance to rule Kamon's world.
+ */
+ def reconfigure(newConfig: Config): Unit = synchronized {
+ _currentConfig = newConfig
+ _onReconfigureHooks.foreach(hook => {
+ Try(hook.onReconfigure(newConfig)).failed.foreach(error =>
+ logger.error("Exception occurred while trying to run a OnReconfigureHook", error)
+ )
+ })
+ }
+
+ /**
+ * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All
+ * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config).
+ */
+ def onReconfigure(hook: Configuration.OnReconfigureHook): Unit = synchronized {
+ _onReconfigureHooks = hook +: _onReconfigureHooks
+ }
+
+ /**
+ * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All
+ * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config).
+ */
+ def onReconfigure(hook: (Config) => Unit): Unit = {
+ onReconfigure(new Configuration.OnReconfigureHook {
+ override def onReconfigure(newConfig: Config): Unit = hook.apply(newConfig)
+ })
+ }
+
+}
+
+object Configuration {
+
+ trait OnReconfigureHook {
+ def onReconfigure(newConfig: Config): Unit
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/ContextPropagation.scala b/kamon-core/src/main/scala/kamon/ContextPropagation.scala
new file mode 100644
index 00000000..8f2ed8e4
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ContextPropagation.scala
@@ -0,0 +1,92 @@
+package kamon
+
+import com.typesafe.config.Config
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+import kamon.context.{BinaryPropagation, HttpPropagation, Propagation}
+
+trait ContextPropagation { self: Configuration with ClassLoading =>
+ @volatile private var _propagationComponents: ContextPropagation.Components = _
+ @volatile private var _defaultHttpPropagation: Propagation[HeaderReader, HeaderWriter] = _
+ @volatile private var _defaultBinaryPropagation: Propagation[ByteStreamReader, ByteStreamWriter] = _
+
+ // Initial configuration and reconfigures
+ init(self.config)
+ self.onReconfigure(newConfig => self.init(newConfig))
+
+
+ /**
+ * Retrieves the HTTP propagation channel with the supplied name. Propagation channels are configured on the
+ * kamon.propagation.http configuration section.
+ *
+ * @param channelName Channel name to retrieve.
+ * @return The HTTP propagation, if available.
+ */
+ def httpPropagation(channelName: String): Option[Propagation[HeaderReader, HeaderWriter]] =
+ _propagationComponents.httpChannels.get(channelName)
+
+ /**
+ * Retrieves the binary propagation channel with the supplied name. Propagation channels are configured on the
+ * kamon.propagation.binary configuration section.
+ *
+ * @param channelName Channel name to retrieve.
+ * @return The binary propagation, if available.
+ */
+ def binaryPropagation(channelName: String): Option[Propagation[ByteStreamReader, ByteStreamWriter]] =
+ _propagationComponents.binaryChannels.get(channelName)
+
+ /**
+ * Retrieves the default HTTP propagation channel. Configuration for this channel can be found under the
+ * kamon.propagation.http.default configuration section.
+ *
+ * @return The default HTTP propagation.
+ */
+ def defaultHttpPropagation(): Propagation[HeaderReader, HeaderWriter] =
+ _defaultHttpPropagation
+
+ /**
+ * Retrieves the default binary propagation channel. Configuration for this channel can be found under the
+ * kamon.propagation.binary.default configuration section.
+ *
+ * @return The default HTTP propagation.
+ */
+ def defaultBinaryPropagation(): Propagation[ByteStreamReader, ByteStreamWriter] =
+ _defaultBinaryPropagation
+
+
+
+ private def init(config: Config): Unit = synchronized {
+ _propagationComponents = ContextPropagation.Components.from(self.config, self)
+ _defaultHttpPropagation = _propagationComponents.httpChannels(ContextPropagation.DefaultHttpChannel)
+ _defaultBinaryPropagation = _propagationComponents.binaryChannels(ContextPropagation.DefaultBinaryChannel)
+ }
+}
+
+object ContextPropagation {
+ val DefaultHttpChannel = "default"
+ val DefaultBinaryChannel = "default"
+
+ case class Components(
+ httpChannels: Map[String, Propagation[HeaderReader, HeaderWriter]],
+ binaryChannels: Map[String, Propagation[ByteStreamReader, ByteStreamWriter]]
+ )
+
+ object Components {
+
+ def from(config: Config, classLoading: ClassLoading): Components = {
+ val propagationConfig = config.getConfig("kamon.propagation")
+ val httpChannelsConfig = propagationConfig.getConfig("http").configurations
+ val binaryChannelsConfig = propagationConfig.getConfig("binary").configurations
+
+ val httpChannels = httpChannelsConfig.map {
+ case (channelName, channelConfig) => (channelName -> HttpPropagation.from(channelConfig, classLoading))
+ }
+
+ val binaryChannels = binaryChannelsConfig.map {
+ case (channelName, channelConfig) => (channelName -> BinaryPropagation.from(channelConfig, classLoading))
+ }
+
+ Components(httpChannels, binaryChannels)
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/ContextStorage.scala b/kamon-core/src/main/scala/kamon/ContextStorage.scala
new file mode 100644
index 00000000..47d0b567
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ContextStorage.scala
@@ -0,0 +1,49 @@
+package kamon
+
+import kamon.context.{Context, Storage}
+import kamon.trace.Span
+
+import scala.util.control.NonFatal
+
+trait ContextStorage {
+ private val _contextStorage = Storage.ThreadLocal()
+
+ def currentContext(): Context =
+ _contextStorage.current()
+
+ def currentSpan(): Span =
+ _contextStorage.current().get(Span.ContextKey)
+
+ def storeContext(context: Context): Storage.Scope =
+ _contextStorage.store(context)
+
+ def withContext[T](context: Context)(f: => T): T = {
+ val scope = _contextStorage.store(context)
+ try {
+ f
+ } finally {
+ scope.close()
+ }
+ }
+
+ def withContextKey[T, K](key: Context.Key[K], value: K)(f: => T): T =
+ withContext(currentContext().withKey(key, value))(f)
+
+ def withSpan[T](span: Span)(f: => T): T =
+ withSpan(span, true)(f)
+
+ def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = {
+ try {
+ withContextKey(Span.ContextKey, span)(f)
+ } catch {
+ case NonFatal(t) =>
+ span.addError(t.getMessage, t)
+ throw t
+
+ } finally {
+ if(finishSpan)
+ span.finish()
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala
index 1c00679d..2e07d18d 100644
--- a/kamon-core/src/main/scala/kamon/Environment.scala
+++ b/kamon-core/src/main/scala/kamon/Environment.scala
@@ -21,6 +21,9 @@ import java.util.concurrent.ThreadLocalRandom
import com.typesafe.config.Config
import kamon.util.HexCodec
+
+
+
case class Environment(host: String, service: String, instance: String, incarnation: String, tags: Map[String, String])
object Environment {
@@ -47,6 +50,4 @@ object Environment {
private def readValueOrGenerate(configuredValue: String, generator: => String): String =
if(configuredValue == "auto") generator else configuredValue
-
-
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 8f69ab41..a6cc7bf4 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -19,7 +19,6 @@ import java.time.Duration
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
import com.typesafe.config.{Config, ConfigFactory}
-import kamon.context.{Codecs, Context, Key, Storage}
import kamon.metric._
import kamon.trace._
import kamon.util.{Clock, Filters, Matcher, Registration}
@@ -29,7 +28,7 @@ import scala.concurrent.Future
import scala.util.Try
-object Kamon extends MetricLookup with ReporterRegistry with Tracer {
+object Kamon extends MetricLookup with ClassLoading with Configuration with ReporterRegistry with Tracer with ContextPropagation with ContextStorage {
private val logger = LoggerFactory.getLogger("kamon.Kamon")
@volatile private var _config = ConfigFactory.load()
@@ -41,39 +40,26 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
- private val _contextStorage = Storage.ThreadLocal()
- private val _contextCodec = new Codecs(_config)
- private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
+ //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
sys.addShutdownHook(() => _scheduler.shutdown())
def environment: Environment =
_environment
- def config(): Config =
- _config
-
- def reconfigure(config: Config): Unit = synchronized {
+ onReconfigure(newConfig => {
_config = config
_environment = Environment.fromConfig(config)
_filters = Filters.fromConfig(config)
_metrics.reconfigure(config)
_reporterRegistry.reconfigure(config)
_tracer.reconfigure(config)
- _contextCodec.reconfigure(config)
-
- _onReconfigureHooks.foreach(hook => {
- Try(hook.onReconfigure(config)).failed.foreach(error =>
- logger.error("Exception occurred while trying to run a OnReconfigureHook", error)
- )
- })
_scheduler match {
case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other)
}
- }
-
+ })
override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric =
_metrics.histogram(name, unit, dynamicRange)
@@ -102,47 +88,6 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
override def identityProvider: IdentityProvider =
_tracer.identityProvider
- def contextCodec(): Codecs =
- _contextCodec
-
- def currentContext(): Context =
- _contextStorage.current()
-
- def currentSpan(): Span =
- _contextStorage.current().get(Span.ContextKey)
-
- def storeContext(context: Context): Storage.Scope =
- _contextStorage.store(context)
-
- def withContext[T](context: Context)(f: => T): T = {
- val scope = _contextStorage.store(context)
- try {
- f
- } finally {
- scope.close()
- }
- }
-
- def withContextKey[T, K](key: Key[K], value: K)(f: => T): T =
- withContext(currentContext().withKey(key, value))(f)
-
- def withSpan[T](span: Span)(f: => T): T =
- withSpan(span, true)(f)
-
- def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = {
- try {
- withContextKey(Span.ContextKey, span)(f)
- } catch {
- case t: Throwable =>
- span.addError(t.getMessage, t)
- throw t
-
- } finally {
- if(finishSpan)
- span.finish()
- }
- }
-
override def loadReportersFromConfig(): Unit =
_reporterRegistry.loadReportersFromConfig()
@@ -173,14 +118,6 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
def clock(): Clock =
_clock
- /**
- * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All
- * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config).
- */
- def onReconfigure(hook: OnReconfigureHook): Unit = synchronized {
- _onReconfigureHooks = hook +: _onReconfigureHooks
- }
-
def scheduler(): ScheduledExecutorService =
_scheduler
@@ -188,7 +125,3 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
config.getInt("kamon.scheduler-pool-size")
}
-
-trait OnReconfigureHook {
- def onReconfigure(newConfig: Config): Unit
-}
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
new file mode 100644
index 00000000..75e65c44
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
@@ -0,0 +1,301 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
+
+import com.typesafe.config.Config
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
+
+
+/**
+ * Context propagation that uses byte stream abstractions as the transport medium. The Binary propagation uses
+ * instances of [[ByteStreamReader]] and [[ByteStreamWriter]] to decode and encode Context instances, respectively.
+ *
+ * Binary propagation uses the [[ByteStreamReader]] and [[ByteStreamWriter]] abstraction which closely model the APIs
+ * from [[InputStream]] and [[OutputStream]], but without exposing additional functionality that wouldn't have any
+ * well defined behavior for Context propagation, e.g. flush or close functions on OutputStreams.
+ */
+object BinaryPropagation {
+
+ /**
+ * Represents a readable stream of bytes. This interface closely resembles [[InputStream]], minus the functionality
+ * that wouldn't have a clearly defined behavior in the context of Context propagation.
+ */
+ trait ByteStreamReader {
+ /**
+ * Number of available bytes on the ByteStream.
+ */
+ def available(): Int
+
+ /**
+ * Reads as many bytes as possible into the target byte array.
+ *
+ * @param target Target buffer in which the read bytes will be written.
+ * @return The number of bytes written into the target buffer.
+ */
+ def read(target: Array[Byte]): Int
+
+ /**
+ * Reads a specified number of bytes into the target buffer, starting from the offset position.
+ *
+ * @param target Target buffer in which read bytes will be written.
+ * @param offset Offset index in which to start writing bytes on the target buffer.
+ * @param count Number of bytes to be read.
+ * @return The number of bytes written into the target buffer.
+ */
+ def read(target: Array[Byte], offset: Int, count: Int): Int
+
+ /**
+ * Reads all available bytes into a newly created byte array.
+ *
+ * @return All bytes read.
+ */
+ def readAll(): Array[Byte]
+ }
+
+
+ object ByteStreamReader {
+
+ /**
+ * Creates a new [[ByteStreamReader]] that reads data from a byte array.
+ */
+ def of(bytes: Array[Byte]): ByteStreamReader = new ByteArrayInputStream(bytes) with ByteStreamReader {
+ override def readAll(): Array[Byte] = {
+ val target = Array.ofDim[Byte](available())
+ read(target, 0, available())
+ target
+ }
+ }
+ }
+
+
+ /**
+ * Represents a writable stream of bytes. This interface closely resembles [[OutputStream]], minus the functionality
+ * that wouldn't have a clearly defined behavior in the context of Context propagation.
+ */
+ trait ByteStreamWriter {
+
+ /**
+ * Writes all bytes into the stream.
+ */
+ def write(bytes: Array[Byte]): Unit
+
+ /**
+ * Writes a portion of the provided bytes into the stream.
+ *
+ * @param bytes Buffer from which data will be selected.
+ * @param offset Starting index on the buffer.
+ * @param count Number of bytes to write into the stream.
+ */
+ def write(bytes: Array[Byte], offset: Int, count: Int): Unit
+
+ /**
+ * Write a single byte into the stream.
+ */
+ def write(byte: Int): Unit
+
+ }
+
+ object ByteStreamWriter {
+
+ /**
+ * Creates a new [[ByteStreamWriter]] from an OutputStream.
+ */
+ def of(outputStream: OutputStream): ByteStreamWriter = new ByteStreamWriter {
+ override def write(bytes: Array[Byte]): Unit =
+ outputStream.write(bytes)
+
+ override def write(bytes: Array[Byte], offset: Int, count: Int): Unit =
+ outputStream.write(bytes, offset, count)
+
+ override def write(byte: Int): Unit =
+ outputStream.write(byte)
+ }
+ }
+
+
+
+ /**
+ * Create a new default Binary Propagation instance from the provided configuration.
+ *
+ * @param config Binary Propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): Propagation[ByteStreamReader, ByteStreamWriter] = {
+ new BinaryPropagation.Default(Settings.from(config, classLoading))
+ }
+
+ /**
+ * Default Binary propagation in Kamon. This implementation uses Colfer to read and write the context tags and
+ * entries. Entries are represented as simple pairs of entry name and bytes, which are then processed by the all
+ * configured entry readers and writers.
+ */
+ class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] {
+ private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
+ private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] {
+ override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
+ }
+
+ private val _contextBufferPool = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize)
+ }
+
+ override def read(reader: ByteStreamReader): Context = {
+ if(reader.available() > 0) {
+ val contextData = Try {
+ val cContext = new ColferContext()
+ cContext.unmarshal(reader.readAll(), 0)
+ cContext
+ }
+
+ contextData.failed.foreach {
+ case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t)
+ }
+
+ contextData.map { colferContext =>
+
+ // Context tags
+ var tagSectionsCount = colferContext.tags.length
+ if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
+ _log.warn("Malformed Context tags found, tags consistency might be compromised")
+ tagSectionsCount -= 1
+ }
+
+ val tags = if (tagSectionsCount > 0) {
+ val tagsBuilder = Map.newBuilder[String, String]
+ var tagIndex = 0
+ while (tagIndex < tagSectionsCount) {
+ tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1))
+ tagIndex += 2
+ }
+ tagsBuilder.result()
+
+ } else Map.empty[String, String]
+
+
+ // Only reads the entries for which there is a registered reader
+ colferContext.entries.foldLeft(Context.of(tags)) {
+ case (context, entryData) =>
+ settings.incomingEntries.get(entryData.name).map { entryReader =>
+ var contextWithEntry = context
+ try {
+ contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
+ }
+
+ contextWithEntry
+ }.getOrElse(context)
+ }
+ } getOrElse(Context.Empty)
+ } else Context.Empty
+ }
+
+ override def write(context: Context, writer: ByteStreamWriter): Unit = {
+ if (context.nonEmpty()) {
+ val contextData = new ColferContext()
+ val output = _streamPool.get()
+ val contextOutgoingBuffer = _contextBufferPool.get()
+
+ if (context.tags.nonEmpty) {
+ val tags = Array.ofDim[String](context.tags.size * 2)
+ var tagIndex = 0
+ context.tags.foreach {
+ case (key, value) =>
+ tags.update(tagIndex, key)
+ tags.update(tagIndex + 1, value)
+ tagIndex += 2
+ }
+
+ contextData.tags = tags
+ }
+
+ if (context.entries.nonEmpty) {
+ val entries = settings.outgoingEntries.collect {
+ case (entryName, entryWriter) if context.entries.contains(entryName) =>
+ val colferEntry = new ColferEntry()
+ try {
+ output.reset()
+ entryWriter.write(context, output)
+
+ colferEntry.name = entryName
+ colferEntry.content = output.toByteArray()
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
+ }
+
+ colferEntry
+ }
+
+ contextData.entries = entries.toArray
+ }
+
+ try {
+ val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
+ writer.write(contextOutgoingBuffer, 0, contextSize)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t)
+ }
+ }
+ }
+ }
+
+ object Default {
+ private class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter {
+ def underlying(): Array[Byte] = this.buf
+ }
+ }
+
+ case class Settings(
+ maxOutgoingSize: Int,
+ incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]]
+ )
+
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[BinaryPropagation.Settings])
+
+ def from(config: Config, classLoading: ClassLoading): BinaryPropagation.Settings = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val instanceMap = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
+ }
+ }
+
+ instanceMap.result()
+ }
+
+ Settings(
+ config.getBytes("max-outgoing-size").toInt,
+ buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs),
+ buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs)
+ )
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala
deleted file mode 100644
index c5d237a9..00000000
--- a/kamon-core/src/main/scala/kamon/context/Codecs.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon
-package context
-
-import java.nio.ByteBuffer
-
-import com.typesafe.config.Config
-import kamon.util.DynamicAccess
-import org.slf4j.LoggerFactory
-import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
-
-import scala.collection.mutable
-
-class Codecs(initialConfig: Config) {
- private val log = LoggerFactory.getLogger(classOf[Codecs])
- @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty)
- @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty)
-
- reconfigure(initialConfig)
-
-
- def HttpHeaders: Codecs.ForContext[TextMap] =
- httpHeaders
-
- def Binary: Codecs.ForContext[ByteBuffer] =
- binary
-
- def reconfigure(config: Config): Unit = {
- import scala.collection.JavaConverters._
- try {
- val codecsConfig = config.getConfig("kamon.context.codecs")
- val stringKeys = readStringKeysConfig(codecsConfig.getConfig("string-keys"))
- val knownHttpHeaderCodecs = readEntryCodecs[TextMap]("http-headers-keys", codecsConfig) ++ stringHeaderCodecs(stringKeys)
- val knownBinaryCodecs = readEntryCodecs[ByteBuffer]("binary-keys", codecsConfig) ++ stringBinaryCodecs(stringKeys)
-
- httpHeaders = new Codecs.HttpHeaders(knownHttpHeaderCodecs)
- binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), knownBinaryCodecs)
- } catch {
- case t: Throwable => log.error("Failed to initialize Context Codecs", t)
- }
- }
-
- private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = {
- val rootConfig = config.getConfig(rootKey)
- val dynamic = new DynamicAccess(getClass.getClassLoader)
- val entries = Map.newBuilder[String, Codecs.ForEntry[T]]
-
- rootConfig.topLevelKeys.foreach(key => {
- try {
- val fqcn = rootConfig.getString(key)
- entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get))
- } catch {
- case e: Throwable =>
- log.error(s"Failed to initialize codec for key [$key]", e)
- }
- })
-
- entries.result()
- }
-
- private def readStringKeysConfig(config: Config): Map[String, String] =
- config.topLevelKeys.map(key => (key, config.getString(key))).toMap
-
- private def stringHeaderCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[TextMap]] =
- keys.map { case (key, header) => (key, new Codecs.StringHeadersCodec(key, header)) }
-
- private def stringBinaryCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[ByteBuffer]] =
- keys.map { case (key, _) => (key, new Codecs.StringBinaryCodec(key)) }
-}
-
-object Codecs {
-
- trait ForContext[T] {
- def encode(context: Context): T
- def decode(carrier: T): Context
- }
-
- trait ForEntry[T] {
- def encode(context: Context): T
- def decode(carrier: T, context: Context): Context
- }
-
- final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] {
- private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
-
- override def encode(context: Context): TextMap = {
- val encoded = TextMap.Default()
-
- context.entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(codec) =>
- try {
- codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
- } catch {
- case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
- }
-
- case None =>
- log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
-
- encoded
- }
-
- override def decode(carrier: TextMap): Context = {
- var context: Context = Context.Empty
-
- try {
- context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
- val (_, codec) = codecEntry
- codec.decode(carrier, ctx)
- })
-
- } catch {
- case e: Throwable =>
- log.error("Failed to decode context from HttpHeaders", e)
- }
-
- context
- }
- }
-
-
- final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] {
- private val log = LoggerFactory.getLogger(classOf[Binary])
- private val binaryBuffer = newThreadLocalBuffer(bufferSize)
- private val emptyBuffer = ByteBuffer.allocate(0)
-
- override def encode(context: Context): ByteBuffer = {
- val entries = context.entries
- if(entries.isEmpty)
- emptyBuffer
- else {
- var colferEntries: List[ColferEntry] = Nil
- entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(entryCodec) =>
- try {
- val entryData = entryCodec.encode(context)
- if(entryData.capacity() > 0) {
- val colferEntry = new ColferEntry()
- colferEntry.setName(key.name)
- colferEntry.setContent(entryData.array())
- colferEntries = colferEntry :: colferEntries
- }
- } catch {
- case throwable: Throwable =>
- log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
- }
-
- case None =>
- log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
-
- if(colferEntries.isEmpty)
- emptyBuffer
- else {
- val buffer = binaryBuffer.get()
- val colferContext = new ColferContext()
- colferContext.setEntries(colferEntries.toArray)
- val marshalledSize = colferContext.marshal(buffer, 0)
-
- val data = Array.ofDim[Byte](marshalledSize)
- System.arraycopy(buffer, 0, data, 0, marshalledSize)
- ByteBuffer.wrap(data)
- }
- }
- }
-
-
- override def decode(carrier: ByteBuffer): Context = {
- if(carrier.capacity() == 0)
- Context.Empty
- else {
- var context: Context = Context.Empty
-
- try {
- val colferContext = new ColferContext()
- colferContext.unmarshal(carrier.array(), 0)
-
- colferContext.entries.foreach(colferEntry => {
- entryCodecs.get(colferEntry.getName()) match {
- case Some(entryCodec) =>
- context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context)
-
- case None =>
- log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName())
- }
- })
-
- } catch {
- case e: Throwable =>
- log.error("Failed to decode context from Binary", e)
- }
-
- context
- }
- }
-
-
- private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] {
- override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt)
- }
- }
-
- private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] {
- private val contextKey = Key.broadcast[Option[String]](key, None)
-
- override def encode(context: Context): TextMap = {
- val textMap = TextMap.Default()
- context.get(contextKey).foreach { value =>
- textMap.put(headerName, value)
- }
-
- textMap
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
- carrier.get(headerName) match {
- case value @ Some(_) => context.withKey(contextKey, value)
- case None => context
- }
- }
- }
-
- private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] {
- val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0)
- private val contextKey = Key.broadcast[Option[String]](key, None)
-
- override def encode(context: Context): ByteBuffer = {
- context.get(contextKey) match {
- case Some(value) => ByteBuffer.wrap(value.getBytes)
- case None => emptyBuffer
- }
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- context.withKey(contextKey, Some(new String(carrier.array())))
- }
- }
-}
-
-
-trait TextMap {
-
- def get(key: String): Option[String]
-
- def put(key: String, value: String): Unit
-
- def values: Iterator[(String, String)]
-}
-
-object TextMap {
-
- class Default extends TextMap {
- private val storage =
- mutable.Map.empty[String, String]
-
- override def get(key: String): Option[String] =
- storage.get(key)
-
- override def put(key: String, value: String): Unit =
- storage.put(key, value)
-
- override def values: Iterator[(String, String)] =
- storage.toIterator
- }
-
- object Default {
- def apply(): Default =
- new Default()
- }
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index e0b084cb..2a7a382e 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -1,5 +1,5 @@
/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -13,90 +13,88 @@
* =========================================================================================
*/
-package kamon.context
+package kamon
+package context
-import java.io._
-import java.nio.ByteBuffer
+import java.util.{Map => JavaMap}
+import scala.collection.JavaConverters._
-import kamon.Kamon
+class Context private (val entries: Map[String, Any], val tags: Map[String, String]) {
-class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable {
- def get[T](key: Key[T]): T =
- entries.getOrElse(key, key.emptyValue).asInstanceOf[T]
+ def get[T](key: Context.Key[T]): T =
+ entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T]
- def withKey[T](key: Key[T], value: T): Context =
- new Context(entries.updated(key, value))
+ def getTag(tagKey: String): Option[String] =
+ tags.get(tagKey)
- var _deserializedEntries: Map[Key[_], Any] = Map.empty
+ def withKey[T](key: Context.Key[T], value: T): Context =
+ new Context(entries.updated(key.name, value), tags)
- @throws[IOException]
- private def writeObject(out: ObjectOutputStream): Unit = out.write(
- Kamon.contextCodec().Binary.encode(this).array()
- )
+ def withTag(tagKey: String, tagValue: String): Context =
+ new Context(entries, tags.updated(tagKey, tagValue))
- @throws[IOException]
- @throws[ClassNotFoundException]
- private def readObject(in: ObjectInputStream): Unit = {
- val buf = new Array[Byte](in.available())
- in.readFully(buf)
- _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries
- }
+ def withTags(tags: Map[String, String]): Context =
+ new Context(entries, this.tags ++ tags)
- def readResolve(): AnyRef = new Context(_deserializedEntries)
+ def withTags(tags: JavaMap[String, String]): Context =
+ new Context(entries, this.tags ++ tags.asScala.toMap)
- override def equals(obj: scala.Any): Boolean = {
- obj != null &&
- obj.isInstanceOf[Context] &&
- obj.asInstanceOf[Context].entries != null &&
- obj.asInstanceOf[Context].entries == this.entries
- }
+ def isEmpty(): Boolean =
+ entries.isEmpty && tags.isEmpty
- override def hashCode(): Int = entries.hashCode()
+ def nonEmpty(): Boolean =
+ !isEmpty()
}
object Context {
- val Empty = new Context(Map.empty)
+ val Empty = new Context(Map.empty, Map.empty)
- def apply(): Context =
- Empty
+ def of(tags: JavaMap[String, String]): Context =
+ new Context(Map.empty, tags.asScala.toMap)
- def create(): Context =
- Empty
+ def of(tags: Map[String, String]): Context =
+ new Context(Map.empty, tags)
- def apply[T](key: Key[T], value: T): Context =
- new Context(Map(key -> value))
+ def of[T](key: Context.Key[T], value: T): Context =
+ new Context(Map(key.name -> value), Map.empty)
- def create[T](key: Key[T], value: T): Context =
- apply(key, value)
-
-}
-
-
-sealed abstract class Key[T] {
- def name: String
- def emptyValue: T
- def broadcast: Boolean
-}
+ def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context =
+ new Context(Map(key.name -> value), tags.asScala.toMap)
-object Key {
+ def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context =
+ new Context(Map(key.name -> value), tags)
- def local[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, false)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context =
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty)
- def broadcast[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context =
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap)
- def broadcastString(name: String): Key[Option[String]] =
- new Default[Option[String]](name, None, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context =
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags)
+ def key[T](name: String, emptyValue: T): Context.Key[T] =
+ new Context.Key(name, emptyValue)
- private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] {
+ /**
+ * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance
+ * must be done using a context key, which will ensure the right type is used on both operations. The key's name
+ * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels.
+ *
+ * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned
+ * instead.
+ *
+ * @param name Key name. Must be unique.
+ * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key.
+ * @tparam ValueType Type of the value to be held on the context with this key.
+ */
+ final class Key[ValueType](val name: String, val emptyValue: ValueType) {
override def hashCode(): Int =
name.hashCode
override def equals(that: Any): Boolean =
- that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name
+ that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name
}
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
new file mode 100644
index 00000000..6a15e2a6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -0,0 +1,206 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
+
+/**
+ * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of
+ * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on
+ * outgoing requests to transfer a context to remote services.
+ */
+object HttpPropagation {
+
+ /**
+ * Wrapper that reads HTTP headers from a HTTP message.
+ */
+ trait HeaderReader {
+
+ /**
+ * Reads a single HTTP header value.
+ *
+ * @param header HTTP header name
+ * @return The HTTP header value, if present.
+ */
+ def read(header: String): Option[String]
+
+ /**
+ * Reads all HTTP headers present in the wrapped HTTP message.
+ *
+ * @return A map from header name to
+ */
+ def readAll(): Map[String, String]
+ }
+
+ /**
+ * Wrapper that writes HTTP headers to a HTTP message.
+ */
+ trait HeaderWriter {
+
+ /**
+ * Writes a HTTP header into a HTTP message.
+ *
+ * @param header HTTP header name.
+ * @param value HTTP header value.
+ */
+ def write(header: String, value: String): Unit
+ }
+
+
+
+ /**
+ * Create a new default HTTP propagation instance from the provided configuration.
+ *
+ * @param config HTTP propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = {
+ new HttpPropagation.Default(Settings.from(config, classLoading))
+ }
+
+ /**
+ * Default HTTP Propagation in Kamon.
+ */
+ class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
+
+ /**
+ * Reads context tags and entries on the following order:
+ * 1. Read all context tags from the context tags header.
+ * 2. Read all context tags with explicit mappings. This overrides any tag
+ * from the previous step in case of a tag key clash.
+ * 3. Read all context entries using the incoming entries configuration.
+ */
+ override def read(reader: HeaderReader): Context = {
+ val tags = Map.newBuilder[String, String]
+
+ // Tags encoded together in the context tags header.
+ try {
+ reader.read(settings.tagsHeaderName).foreach { contextTagsHeader =>
+ contextTagsHeader.split(";").foreach(tagData => {
+ val tagPair = tagData.split("=")
+ if (tagPair.length == 2) {
+ tags += (tagPair(0) -> tagPair(1))
+ }
+ })
+ }
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
+ }
+
+ // Tags explicitly mapped on the tags.mappings configuration.
+ settings.tagsMappings.foreach {
+ case (tagName, httpHeader) =>
+ try {
+ reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
+ }
+ }
+
+ // Incoming Entries
+ settings.incomingEntries.foldLeft(Context.of(tags.result())) {
+ case (context, (entryName, entryDecoder)) =>
+ var result = context
+ try {
+ result = entryDecoder.read(reader, context)
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+
+ result
+ }
+ }
+
+ /**
+ * Writes context tags and entries
+ */
+ override def write(context: Context, writer: HeaderWriter): Unit = {
+ val contextTagsHeader = new StringBuilder()
+ def appendTag(key: String, value: String): Unit = {
+ contextTagsHeader
+ .append(key)
+ .append('=')
+ .append(value)
+ .append(';')
+ }
+
+ // Write tags with specific mappings or append them to the context tags header.
+ context.tags.foreach {
+ case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
+ case None => appendTag(tagKey, tagValue)
+ }
+ }
+
+ // Write the context tags header.
+ if(contextTagsHeader.nonEmpty) {
+ writer.write(settings.tagsHeaderName, contextTagsHeader.result())
+ }
+
+ // Write entries for the specified direction.
+ settings.outgoingEntries.foreach {
+ case (entryName, entryWriter) =>
+ try {
+ entryWriter.write(context, writer)
+ } catch {
+ case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+ }
+ }
+ }
+
+ case class Settings(
+ tagsHeaderName: String,
+ tagsMappings: Map[String, String],
+ incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]]
+ )
+
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings])
+
+ def from(config: Config, classLoading: ClassLoading): Settings = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val instanceMap = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
+ }
+ }
+
+ instanceMap.result()
+ }
+
+ val tagsHeaderName = config.getString("tags.header-name")
+ val tagsMappings = config.getConfig("tags.mappings").pairs
+ val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs)
+ val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs)
+
+ Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries)
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala
deleted file mode 100644
index 3445cc31..00000000
--- a/kamon-core/src/main/scala/kamon/context/Mixin.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.context
-
-import kamon.Kamon
-
-
-/**
- * Utility trait that marks objects carrying a reference to a Context instance.
- *
- */
-trait HasContext {
- def context: Context
-}
-
-object HasContext {
- private case class Default(context: Context) extends HasContext
-
- /**
- * Construct a HasSpan instance that references the provided Context.
- *
- */
- def from(context: Context): HasContext =
- Default(context)
-
- /**
- * Construct a HasContext instance with the current Kamon from Kamon's default context storage.
- *
- */
- def fromCurrentContext(): HasContext =
- Default(Kamon.currentContext())
-}
diff --git a/kamon-core/src/main/scala/kamon/context/Propagation.scala b/kamon-core/src/main/scala/kamon/context/Propagation.scala
new file mode 100644
index 00000000..1d973ca9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Propagation.scala
@@ -0,0 +1,81 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.context
+
+/**
+ * Inter-process Context propagation interface. Implementations are responsible from reading and writing a lossless
+ * representation of a given Context instance to the appropriate mediums. Out of the box, Kamon ships with two
+ * implementations:
+ *
+ * * HttpPropagation: Uses HTTP headers as the medium to transport the Context data.
+ * * BinaryPropagation: Uses byte streams as the medium to transport the Context data.
+ *
+ */
+trait Propagation[ReaderMedium, WriterMedium] {
+
+ /**
+ * Attempts to read a Context from the [[ReaderMedium]].
+ *
+ * @param medium An abstraction the reads data from the medium transporting the Context data.
+ * @return The decoded Context instance or an empty Context if no entries or tags could be read from the medium.
+ */
+ def read(medium: ReaderMedium): Context
+
+ /**
+ * Attempts to write a Context instance to the [[WriterMedium]].
+ *
+ * @param context Context instance to be written.
+ * @param medium An abstraction that writes data into the medium that will transport the Context data.
+ */
+ def write(context: Context, medium: WriterMedium): Unit
+
+}
+
+object Propagation {
+
+ /**
+ * Encapsulates logic required to read a single context entry from a medium. Implementations of this trait
+ * must be aware of the entry they are able to read.
+ */
+ trait EntryReader[Medium] {
+
+ /**
+ * Tries to read a context entry from the medium. If a context entry is successfully read, implementations
+ * must return an updated context instance that includes such entry. If no entry could be read simply return the
+ * context instance that was passed in, unchanged.
+ *
+ * @param medium An abstraction the reads data from the medium transporting the Context data.
+ * @param context Current context.
+ * @return Either the original context passed in or a modified version of it, including the read entry.
+ */
+ def read(medium: Medium, context: Context): Context
+ }
+
+ /**
+ * Encapsulates logic required to write a single context entry to the medium. Implementations of this trait
+ * must be aware of the entry they are able to write.
+ */
+ trait EntryWriter[Medium] {
+
+ /**
+ * Tries to write a context entry into the medium.
+ *
+ * @param context The context from which entries should be written.
+ * @param medium An abstraction that writes data into the medium that will transport the Context data.
+ */
+ def write(context: Context, medium: Medium): Unit
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
new file mode 100644
index 00000000..b141331b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
@@ -0,0 +1,66 @@
+package kamon.instrumentation
+
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+
+/**
+ * Base abstractions over HTTP messages.
+ */
+object HttpMessage {
+
+ /**
+ * Wrapper for HTTP Request messages.
+ */
+ trait Request extends HeaderReader {
+
+ /**
+ * Request URL.
+ */
+ def url: String
+
+ /**
+ * Full request path. Does not include the query.
+ */
+ def path: String
+
+ /**
+ * HTTP Method.
+ */
+ def method: String
+ }
+
+ /**
+ * Wrapper for HTTP response messages.
+ */
+ trait Response {
+
+ /**
+ * Status code on the response message.
+ */
+ def statusCode: Int
+ }
+
+ /**
+ * A HTTP message builder on which header values can be written and a complete HTTP message can be build from.
+ * Implementations will typically wrap a HTTP message model from an instrumented framework and either accumulate
+ * all header writes until a call to to .build() is made and a new HTTP message is constructed merging the previous
+ * and accumulated headers (on immutable HTTP models) or directly write the headers on the underlying HTTP message
+ * (on mutable HTTP models).
+ */
+ trait Builder[Message] extends HeaderWriter {
+
+ /**
+ * Returns a version a version of the HTTP message container all headers that have been written to the builder.
+ */
+ def build(): Message
+ }
+
+ /**
+ * Builder for HTTP Request messages.
+ */
+ trait RequestBuilder[Message] extends Request with Builder[Message]
+
+ /**
+ * Builder for HTTP Response messages.
+ */
+ trait ResponseBuilder[Message] extends Response with Builder[Message]
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
new file mode 100644
index 00000000..72828424
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -0,0 +1,453 @@
+package kamon
+package instrumentation
+
+import java.time.Duration
+
+import com.typesafe.config.Config
+import kamon.context.Context
+import kamon.instrumentation.HttpServer.Settings.TagMode
+import kamon.metric.MeasurementUnit.{time, information}
+import kamon.trace.{IdentityProvider, Span}
+import kamon.util.GlobPathFilter
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server
+ * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All
+ * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default".
+ *
+ * The default implementation shipping with Kamon provides:
+ *
+ * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context
+ * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server.
+ * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric
+ * tags from the incoming requests as well as form the incoming context tags.
+ * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code
+ * counts in the HTTP server.
+ *
+ */
+trait HttpServer {
+
+ /**
+ * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that
+ * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from
+ * HTTP headers (if context propagation is enabled).
+ *
+ * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked
+ * for all incoming requests.
+ *
+ * @param request A HttpRequest wrapper on the original incoming HTTP request.
+ * @return The RequestHandler that will follow the lifecycle of the incoming request.
+ */
+ def receive(request: HttpMessage.Request): HttpServer.RequestHandler
+
+
+ /**
+ * Signals that a new HTTP connection has been opened.
+ */
+ def openConnection(): Unit
+
+
+ /**
+ * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests
+ * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will
+ * be updated when the values are zero.
+ *
+ * @param lifetime For how long did the connection remain open.
+ * @param handledRequests How many requests where handled by the closed connection.
+ */
+ def closeConnection(lifetime: Duration, handledRequests: Long): Unit
+
+ /**
+ * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances
+ * after calling this function is undefined.
+ */
+ def shutdown(): Unit
+
+}
+
+object HttpServer {
+
+ /**
+ * Handler associated to the processing of a single request. The instrumentation code using this class is responsible
+ * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the
+ * doneReceiving, send and doneSending callbacks when appropriate.
+ */
+ trait RequestHandler {
+
+ /**
+ * If context propagation is enabled this function returns the incoming context associated wih this request,
+ * otherwise [[Context.Empty]] is returned.
+ */
+ def context: Context
+
+ /**
+ * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span.
+ */
+ def span: Span
+
+ /**
+ * Signals that the entire request (headers and body) has been received.
+ *
+ * @param receivedBytes Size of the entire HTTP request.
+ */
+ def doneReceiving(receivedBytes: Long): Unit
+
+ /**
+ * Process a response to be sent back to the client. Since returning keys might need to included in the response
+ * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one
+ * passed into this function.
+ *
+ * @param response Wraps the HTTP response to be sent back to the client.
+ * @param context Context that should be used for writing returning keys into the response.
+ * @return The modified HTTP response that should be sent to clients.
+ */
+ def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse
+
+ /**
+ * Signals that the entire response (headers and body) has been sent to the client.
+ *
+ * @param sentBytes Size of the entire HTTP response.
+ */
+ def doneSending(sentBytes: Long): Unit
+
+ }
+
+
+ /**
+ * Holds all metric instruments required to record metrics from an HTTP server.
+ *
+ * @param interface Interface name or address where the HTTP server is listening.
+ * @param port Port number where the HTTP server is listening.
+ */
+ class Metrics(component: String, interface: String, port: Int) {
+ import Metrics._
+ private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics])
+ private val _statusCodeTag = "status_code"
+ private val _serverTags = Map(
+ "component" -> component,
+ "interface" -> interface,
+ "port" -> port.toString
+ )
+
+ val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx"))
+ val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx"))
+ val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx"))
+ val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx"))
+ val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx"))
+
+ val activeRequests = ActiveRequests.refine(_serverTags)
+ val requestSize = RequestSize.refine(_serverTags)
+ val responseSize = ResponseSize.refine(_serverTags)
+ val connectionLifetime = ConnectionLifetime.refine(_serverTags)
+ val connectionUsage = ConnectionUsage.refine(_serverTags)
+ val openConnections = OpenConnections.refine(_serverTags)
+
+
+ def countCompletedRequest(statusCode: Int): Unit = {
+ if(statusCode >= 200 && statusCode <= 299)
+ requestsSuccessful.increment()
+ else if(statusCode >= 500 && statusCode <=599)
+ requestsServerError.increment()
+ else if(statusCode >= 400 && statusCode <=499)
+ requestsClientError.increment()
+ else if(statusCode >= 300 && statusCode <=399)
+ requestsRedirection.increment()
+ else if(statusCode >= 100 && statusCode <=199)
+ requestsInformational.increment()
+ else {
+ _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString)
+ }
+ }
+
+ /**
+ * Removes all registered metrics from Kamon.
+ */
+ def cleanup(): Unit = {
+ CompletedRequests.remove(statusCodeTag("1xx"))
+ CompletedRequests.remove(statusCodeTag("2xx"))
+ CompletedRequests.remove(statusCodeTag("3xx"))
+ CompletedRequests.remove(statusCodeTag("4xx"))
+ CompletedRequests.remove(statusCodeTag("5xx"))
+
+ ActiveRequests.remove(_serverTags)
+ RequestSize.remove(_serverTags)
+ ResponseSize.remove(_serverTags)
+ ConnectionLifetime.remove(_serverTags)
+ ConnectionUsage.remove(_serverTags)
+ OpenConnections.remove(_serverTags)
+ }
+
+ private def statusCodeTag(group: String): Map[String, String] =
+ _serverTags + (_statusCodeTag -> group)
+
+ }
+
+
+ object Metrics {
+
+ def of(component: String, interface: String, port: Int): Metrics =
+ new HttpServer.Metrics(component, interface, port)
+
+ /**
+ * Number of completed requests per status code.
+ */
+ val CompletedRequests = Kamon.counter("http.server.requests")
+
+ /**
+ * Number of requests being processed simultaneously at any point in time.
+ */
+ val ActiveRequests = Kamon.rangeSampler("http.server.request.active")
+
+ /**
+ * Request size distribution (including headers and body) for all requests received by the server.
+ */
+ val RequestSize = Kamon.histogram("http.server.request.size", information.bytes)
+
+ /**
+ * Response size distribution (including headers and body) for all responses served by the server.
+ */
+ val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes)
+
+ /**
+ * Tracks the time elapsed between connection creation and connection close.
+ */
+ val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds)
+
+ /**
+ * Distribution of number of requests handled per connection during their entire lifetime.
+ */
+ val ConnectionUsage = Kamon.histogram("http.server.connection.usage")
+
+ /**
+ * Number of open connections.
+ */
+ val OpenConnections = Kamon.rangeSampler("http.server.connection.open")
+ }
+
+
+ def from(name: String, component: String, interface: String, port: Int): HttpServer = {
+ from(name, component, interface, port, Kamon, Kamon)
+ }
+
+ def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
+ val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration)
+ val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else {
+ configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration)
+ }
+
+ new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port)
+ }
+
+ val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server"
+ val DefaultHttpServer = "default"
+ val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default"
+
+
+ private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer {
+ private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None
+ private val _log = LoggerFactory.getLogger(classOf[Default])
+ private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
+ .getOrElse {
+ _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation")
+ contextPropagation.defaultHttpPropagation()
+ }
+
+ override def receive(request: HttpMessage.Request): RequestHandler = {
+
+ val incomingContext = if(settings.enableContextPropagation)
+ _propagation.read(request)
+ else Context.Empty
+
+ val requestSpan = if(settings.enableTracing)
+ buildServerSpan(incomingContext, request)
+ else Span.Empty
+
+ val handlerContext = if(requestSpan.nonEmpty())
+ incomingContext.withKey(Span.ContextKey, requestSpan)
+ else incomingContext
+
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.activeRequests.increment()
+ }
+
+
+ new HttpServer.RequestHandler {
+ override def context: Context =
+ handlerContext
+
+ override def span: Span =
+ requestSpan
+
+ override def doneReceiving(receivedBytes: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.requestSize.record(receivedBytes)
+ }
+ }
+
+ override def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse = {
+ def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.tagMetric(tag, value)
+ case TagMode.Span => span.tag(tag, value)
+ case TagMode.Off =>
+ }
+
+ if(settings.enableContextPropagation) {
+ _propagation.write(context, response)
+ }
+
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.countCompletedRequest(response.statusCode)
+ }
+
+ addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode)
+ response.build()
+ }
+
+ override def doneSending(sentBytes: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.activeRequests.decrement()
+ httpServerMetrics.responseSize.record(sentBytes)
+ }
+
+ span.finish()
+ }
+ }
+ }
+
+ override def openConnection(): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.openConnections.increment()
+ }
+ }
+
+ override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.openConnections.decrement()
+ httpServerMetrics.connectionLifetime.record(lifetime.toNanos)
+ httpServerMetrics.connectionUsage.record(handledRequests)
+ }
+ }
+
+ override def shutdown(): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.cleanup()
+ }
+ }
+
+
+ private def buildServerSpan(context: Context, request: HttpMessage.Request): Span = {
+ val span = Kamon.buildSpan(operationName(request))
+ .withMetricTag("span.kind", "server")
+ .withMetricTag("component", component)
+
+ if(!settings.enableSpanMetrics)
+ span.disableMetrics()
+
+
+ for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } {
+ val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID)
+ if(identifier != IdentityProvider.NoIdentifier)
+ span.withTraceID(identifier)
+ }
+
+ def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.withMetricTag(tag, value)
+ case TagMode.Span => span.withTag(tag, value)
+ case TagMode.Off =>
+ }
+
+ addRequestTag("http.url", request.url, settings.urlTagMode)
+ addRequestTag("http.method", request.method, settings.urlTagMode)
+ settings.contextTags.foreach {
+ case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode))
+ }
+
+ span.start()
+ }
+
+ private def operationName(request: HttpMessage.Request): String = {
+ val requestPath = request.path
+ val customMapping = settings.operationMappings.collectFirst {
+ case (pattern, operationName) if pattern.accept(requestPath) => operationName
+ }
+
+ customMapping.getOrElse("http.request")
+ }
+ }
+
+
+ case class Settings(
+ enableContextPropagation: Boolean,
+ propagationChannel: String,
+ enableServerMetrics: Boolean,
+ enableTracing: Boolean,
+ traceIDTag: Option[String],
+ enableSpanMetrics: Boolean,
+ urlTagMode: TagMode,
+ methodTagMode: TagMode,
+ statusCodeTagMode: TagMode,
+ contextTags: Map[String, TagMode],
+ unhandledOperationName: String,
+ operationMappings: Map[GlobPathFilter, String]
+ )
+
+ object Settings {
+
+ sealed trait TagMode
+ object TagMode {
+ case object Metric extends TagMode
+ case object Span extends TagMode
+ case object Off extends TagMode
+
+ def from(value: String): TagMode = value.toLowerCase match {
+ case "metric" => TagMode.Metric
+ case "span" => TagMode.Span
+ case _ => TagMode.Off
+ }
+ }
+
+ def from(config: Config): Settings = {
+
+ // Context propagation settings
+ val enablePropagation = config.getBoolean("propagation.enabled")
+ val propagationChannel = config.getString("propagation.channel")
+
+ // HTTP Server metrics settings
+ val enableServerMetrics = config.getBoolean("metrics.enabled")
+
+ // Tracing settings
+ val enableTracing = config.getBoolean("tracing.enabled")
+ val traceIdTag = Option(config.getString("tracing.preferred-trace-id-tag")).filterNot(_ == "none")
+ val enableSpanMetrics = config.getBoolean("tracing.span-metrics")
+ val urlTagMode = TagMode.from(config.getString("tracing.tags.url"))
+ val methodTagMode = TagMode.from(config.getString("tracing.tags.method"))
+ val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code"))
+ val contextTags = config.getConfig("tracing.tags.from-context").pairs.map {
+ case (tagName, mode) => (tagName, TagMode.from(mode))
+ }
+
+ val unhandledOperationName = config.getString("tracing.operations.unhandled")
+ val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map {
+ case (pattern, operationName) => (new GlobPathFilter(pattern), operationName)
+ }
+
+ Settings(
+ enablePropagation,
+ propagationChannel,
+ enableServerMetrics,
+ enableTracing,
+ traceIdTag,
+ enableSpanMetrics,
+ urlTagMode,
+ methodTagMode,
+ statusCodeTagMode,
+ contextTags,
+ unhandledOperationName,
+ operationMappings
+ )
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala b/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala
new file mode 100644
index 00000000..f5a5e63b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala
@@ -0,0 +1,36 @@
+package kamon.instrumentation
+
+import kamon.Kamon
+import kamon.context.Context
+
+/**
+ * Common mixins used across multiple instrumentation modules.
+ */
+object Mixin {
+
+ /**
+ * Utility trait that marks objects carrying a reference to a Context instance.
+ *
+ */
+ trait HasContext {
+ def context: Context
+ }
+
+ object HasContext {
+ private case class Default(context: Context) extends HasContext
+
+ /**
+ * Construct a HasSpan instance that references the provided Context.
+ *
+ */
+ def from(context: Context): HasContext =
+ Default(context)
+
+ /**
+ * Construct a HasContext instance with the current Kamon from Kamon's default context storage.
+ *
+ */
+ def fromCurrentContext(): HasContext =
+ Default(Kamon.currentContext())
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala
index d3b25500..d694206c 100644
--- a/kamon-core/src/main/scala/kamon/package.scala
+++ b/kamon-core/src/main/scala/kamon/package.scala
@@ -92,8 +92,14 @@ package object kamon {
def configurations: Map[String, Config] = {
topLevelKeys
- .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry))))
- .toMap
+ .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry))))
+ .toMap
+ }
+
+ def pairs: Map[String, String] = {
+ topLevelKeys
+ .map(key => (key, config.getString(key)))
+ .toMap
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 43391af7..6015e350 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -19,7 +19,7 @@ package trace
import java.time.Instant
import kamon.ReporterRegistry.SpanSink
-import kamon.context.Key
+import kamon.context.Context
import kamon.metric.MeasurementUnit
import kamon.trace.SpanContext.SamplingDecision
import kamon.util.Clock
@@ -66,7 +66,7 @@ sealed abstract class Span {
object Span {
- val ContextKey = Key.broadcast[Span]("span", Span.Empty)
+ val ContextKey = Context.key[Span]("span", Span.Empty)
object Empty extends Span {
override val context: SpanContext = SpanContext.EmptySpanContext
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
index 2a8e2271..ed329ef7 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
@@ -15,7 +15,7 @@
package kamon.trace
-import kamon.context.Key
+import kamon.context.{Context}
import kamon.trace.Tracer.SpanBuilder
/**
@@ -39,7 +39,7 @@ object SpanCustomizer {
override def customize(spanBuilder: SpanBuilder): SpanBuilder = spanBuilder
}
- val ContextKey = Key.local[SpanCustomizer]("span-customizer", Noop)
+ val ContextKey = Context.key[SpanCustomizer]("span-customizer", Noop)
def forOperationName(operationName: String): SpanCustomizer = new SpanCustomizer {
override def customize(spanBuilder: SpanBuilder): SpanBuilder =
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
index 14b28d54..dc168347 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
@@ -19,54 +19,43 @@ import java.net.{URLDecoder, URLEncoder}
import java.nio.ByteBuffer
import kamon.Kamon
-import kamon.context.{Codecs, Context, TextMap}
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context._
import kamon.context.generated.binary.span.{Span => ColferSpan}
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
import kamon.trace.SpanContext.SamplingDecision
-object SpanCodec {
+/**
+ * Propagation mechanisms for Kamon's Span data to and from HTTP and Binary mediums.
+ */
+object SpanPropagation {
- class B3 extends Codecs.ForEntry[TextMap] {
+ /**
+ * Reads and Writes a Span instance using the B3 propagation format. The specification and semantics of the B3
+ * Propagation protocol can be found here: https://github.com/openzipkin/b3-propagation
+ */
+ class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] {
import B3.Headers
- override def encode(context: Context): TextMap = {
- val span = context.get(Span.ContextKey)
- val carrier = TextMap.Default()
-
- if(span.nonEmpty()) {
- val spanContext = span.context()
- carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
- carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
-
- if(spanContext.parentID != IdentityProvider.NoIdentifier)
- carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
-
- encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
- carrier.put(Headers.Sampled, samplingDecision)
- }
- }
-
- carrier
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
val identityProvider = Kamon.tracer.identityProvider
- val traceID = carrier.get(Headers.TraceIdentifier)
+ val traceID = reader.read(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val spanID = carrier.get(Headers.SpanIdentifier)
+ val spanID = reader.read(Headers.SpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
- val parentID = carrier.get(Headers.ParentSpanIdentifier)
+ val parentID = reader.read(Headers.ParentSpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val flags = carrier.get(Headers.Flags)
+ val flags = reader.read(Headers.Flags)
- val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match {
+ val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match {
case Some(sampled) if sampled == "1" => SamplingDecision.Sample
case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
@@ -77,6 +66,24 @@ object SpanCodec {
} else context
}
+
+ override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = {
+ val span = context.get(Span.ContextKey)
+
+ if(span.nonEmpty()) {
+ val spanContext = span.context()
+ writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+
+ if(spanContext.parentID != IdentityProvider.NoIdentifier)
+ writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+
+ encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
+ writer.write(Headers.Sampled, samplingDecision)
+ }
+ }
+ }
+
private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match {
case SamplingDecision.Sample => Some("1")
case SamplingDecision.DoNotSample => Some("0")
@@ -102,38 +109,27 @@ object SpanCodec {
}
- class Colfer extends Codecs.ForEntry[ByteBuffer] {
- val emptyBuffer = ByteBuffer.allocate(0)
-
- override def encode(context: Context): ByteBuffer = {
- val span = context.get(Span.ContextKey)
- if(span.nonEmpty()) {
- val marshalBuffer = Colfer.codecBuffer.get()
- val colferSpan = new ColferSpan()
- val spanContext = span.context()
-
- colferSpan.setTraceID(spanContext.traceID.bytes)
- colferSpan.setSpanID(spanContext.spanID.bytes)
- colferSpan.setParentID(spanContext.parentID.bytes)
- colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
-
- val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
- val buffer = ByteBuffer.allocate(marshalledSize)
- buffer.put(marshalBuffer, 0, marshalledSize)
- buffer
-
- } else emptyBuffer
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- carrier.clear()
-
- if(carrier.capacity() == 0)
+ /**
+ * Defines a bare bones binary context propagation that uses Colfer [1] as the serialization library. The Schema
+ * for the Span data is simply defined as:
+ *
+ * type Span struct {
+ * traceID binary
+ * spanID binary
+ * parentID binary
+ * samplingDecision uint8
+ * }
+ *
+ */
+ class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] {
+
+ override def read(medium: ByteStreamReader, context: Context): Context = {
+ if(medium.available() == 0)
context
else {
val identityProvider = Kamon.tracer.identityProvider
val colferSpan = new ColferSpan()
- colferSpan.unmarshal(carrier.array(), 0)
+ colferSpan.unmarshal(medium.readAll(), 0)
val spanContext = SpanContext(
traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID),
@@ -146,6 +142,24 @@ object SpanCodec {
}
}
+ override def write(context: Context, medium: ByteStreamWriter): Unit = {
+ val span = context.get(Span.ContextKey)
+
+ if(span.nonEmpty()) {
+ val marshalBuffer = Colfer.codecBuffer.get()
+ val colferSpan = new ColferSpan()
+ val spanContext = span.context()
+
+ colferSpan.setTraceID(spanContext.traceID.bytes)
+ colferSpan.setSpanID(spanContext.spanID.bytes)
+ colferSpan.setParentID(spanContext.parentID.bytes)
+ colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
+
+ val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
+ medium.write(marshalBuffer, 0, marshalledSize)
+
+ }
+ }
private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match {
case SamplingDecision.Sample => 1
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 3e857f00..ad7ffbed 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -99,6 +99,7 @@ object Tracer {
private var initialMetricTags = Map.empty[String, String]
private var useParentFromContext = true
private var trackMetrics = true
+ private var providedTraceID = IdentityProvider.NoIdentifier
def asChildOf(parent: Span): SpanBuilder = {
if(parent != Span.Empty) this.parentSpan = parent
@@ -158,6 +159,11 @@ object Tracer {
this
}
+ def withTraceID(identifier: IdentityProvider.Identifier): SpanBuilder = {
+ this.providedTraceID = identifier
+ this
+ }
+
def start(): Span = {
val spanFrom = if(from == Instant.EPOCH) clock.instant() else from
@@ -199,13 +205,21 @@ object Tracer {
else
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)
- private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
+ private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = {
+ val traceID =
+ if(providedTraceID != IdentityProvider.NoIdentifier)
+ providedTraceID
+ else
+ tracer._identityProvider.traceIdGenerator().generate()
+
+
SpanContext(
- traceID = tracer._identityProvider.traceIdGenerator().generate(),
+ traceID,
spanID = tracer._identityProvider.spanIdGenerator().generate(),
parentID = IdentityProvider.NoIdentifier,
samplingDecision = samplingDecision
)
+ }
}
private final class TracerMetrics(metricLookup: MetricLookup) {
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
index 9f17dff0..d36d0df2 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
@@ -15,14 +15,17 @@
package kamon.testkit
-import kamon.context.{Context, Key}
+import kamon.context.{Context}
trait ContextTesting {
- val StringKey = Key.local[Option[String]]("string-key", None)
- val StringBroadcastKey = Key.broadcastString("string-broadcast-key")
+ val StringKey = Context.key[Option[String]]("string-key", None)
+ val StringBroadcastKey = Context.key[Option[String]]("string-broadcast-key", None)
def contextWithLocal(value: String): Context =
- Context.create(StringKey, Some(value))
+ Context.of(StringKey, Some(value))
+
+
+
}
object ContextTesting extends ContextTesting
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala b/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala
deleted file mode 100644
index c755b0af..00000000
--- a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.testkit
-
-import java.nio.ByteBuffer
-
-import kamon.context.{Codecs, Context, TextMap}
-
-object SimpleStringCodec {
- final class Headers extends Codecs.ForEntry[TextMap] {
- private val dataKey = "X-String-Value"
-
- override def encode(context: Context): TextMap = {
- val textMap = TextMap.Default()
- context.get(ContextTesting.StringBroadcastKey).foreach { value =>
- textMap.put(dataKey, value)
- }
-
- textMap
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
- carrier.get(dataKey) match {
- case value @ Some(_) => context.withKey(ContextTesting.StringBroadcastKey, value)
- case None => context
- }
- }
- }
-
- final class Binary extends Codecs.ForEntry[ByteBuffer] {
- val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0)
-
- override def encode(context: Context): ByteBuffer = {
- context.get(ContextTesting.StringBroadcastKey) match {
- case Some(value) => ByteBuffer.wrap(value.getBytes)
- case None => emptyBuffer
- }
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- context.withKey(ContextTesting.StringBroadcastKey, Some(new String(carrier.array())))
- }
- }
-}
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
index fbfdc7c3..c28ace64 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
@@ -19,7 +19,7 @@ import java.time.Instant
import kamon.Kamon
import kamon.trace.{Span, SpanContext}
-import kamon.trace.Span.FinishedSpan
+import kamon.trace.Span.{FinishedSpan, TagValue}
import scala.reflect.ClassTag
import scala.util.Try
@@ -52,9 +52,21 @@ object SpanInspection {
def spanTags(): Map[String, Span.TagValue] =
spanData.tags
+ def tag(key: String): Option[String] = {
+ spanTag(key).map {
+ case TagValue.String(string) => string
+ case TagValue.Number(number) => number.toString
+ case TagValue.True => "true"
+ case TagValue.False => "false"
+ } orElse(metricTag(key))
+ }
+
def metricTags(): Map[String, String] =
getField[Span.Local, Map[String, String]](realSpan, "customMetricTags")
+ def metricTag(key: String): Option[String] =
+ metricTags().get(key)
+
def from(): Instant =
getField[Span.Local, Instant](realSpan, "from")
@@ -64,6 +76,9 @@ object SpanInspection {
def operationName(): String =
spanData.operationName
+ def hasMetricsEnabled(): Boolean =
+ getField[Span.Local, Boolean](realSpan, "collectMetrics")
+
private def getField[T, R](target: Any, fieldName: String)(implicit classTag: ClassTag[T]): R = {
val toFinishedSpanMethod = classTag.runtimeClass.getDeclaredFields.find(_.getName.contains(fieldName)).get
diff --git a/version.sbt b/version.sbt
index 918f99a8..a8bd390a 100644
--- a/version.sbt
+++ b/version.sbt
@@ -1 +1 @@
-version in ThisBuild := "1.1.4-SNAPSHOT"
+version in ThisBuild := "1.2.0-SNAPSHOT"