diff options
39 files changed, 2802 insertions, 990 deletions
diff --git a/.travis.yml b/.travis.yml index 5f85bcd0..a564bb7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: scala script: - sbt +test scala: - - 2.12.4 + - 2.12.6 jdk: - oraclejdk8 before_script: @@ -20,10 +20,10 @@ lazy val kamon = (project in file(".")) .aggregate(core, testkit, coreTests, coreBench) val commonSettings = Seq( - scalaVersion := "2.12.4", + scalaVersion := "2.12.6", javacOptions += "-XDignore.symbol.file", resolvers += Resolver.mavenLocal, - crossScalaVersions := Seq("2.12.4", "2.11.8", "2.10.6"), + crossScalaVersions := Seq("2.12.6", "2.11.8", "2.10.6"), concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), scalacOptions ++= Seq( "-deprecation", diff --git a/kamon-core-tests/src/test/resources/reference.conf b/kamon-core-tests/src/test/resources/reference.conf index 0d7ae9e2..c092af08 100644 --- a/kamon-core-tests/src/test/resources/reference.conf +++ b/kamon-core-tests/src/test/resources/reference.conf @@ -2,4 +2,38 @@ kamon { context.codecs.string-keys { request-id = "X-Request-ID" } +} + + + +kamon { + + trace { + sampler = always + } + + propagation.http.default { + tags.mappings { + "correlation-id" = "x-correlation-id" + } + } + + instrumentation { + http-server { + default { + tracing.preferred-trace-id-tag = "correlation-id" + tracing.tags.from-context.peer = span + } + + no-span-metrics { + tracing.span-metrics = off + } + + noop { + propagation.enabled = no + metrics.enabled = no + tracing.enabled = no + } + } + } }
\ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala new file mode 100644 index 00000000..5681d300 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala @@ -0,0 +1,174 @@ +package kamon.context + +import java.io.ByteArrayOutputStream + +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.Propagation.{EntryReader, EntryWriter} +import org.scalatest.{Matchers, OptionValues, WordSpec} + +import scala.util.Random + +class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { + + "The Binary Context Propagation" should { + "return an empty context if there is no data to read from" in { + val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0))) + context.isEmpty() shouldBe true + } + + "not write any data to the medium if the context is empty" in { + val writer = inspectableByteStreamWriter() + binaryPropagation.write(Context.Empty, writer) + writer.size() shouldBe 0 + } + + "handle malformed data in when reading a context" in { + val randomBytes = Array.ofDim[Byte](42) + Random.nextBytes(randomBytes) + + val context = binaryPropagation.read(ByteStreamReader.of(randomBytes)) + context.isEmpty() shouldBe true + } + + "handle read failures in an entry reader" in { + val context = Context.of( + BinaryPropagationSpec.StringKey, "string-value", + BinaryPropagationSpec.FailStringKey, "fail-read" + ) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.tags shouldBe empty + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null + } + + "handle write failures in an entry writer" in { + val context = Context.of( + BinaryPropagationSpec.StringKey, "string-value", + BinaryPropagationSpec.FailStringKey, "fail-write" + ) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.tags shouldBe empty + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null + } + + "handle write failures in an entry writer when the context is too big" in { + val context = Context.of(BinaryPropagationSpec.StringKey, "string-value" * 20) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext shouldBe empty + } + + "round trip a Context that only has tags" in { + val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.entries shouldBe empty + rtContext.tags should contain theSameElementsAs (context.tags) + } + + "round trip a Context that only has entries" in { + val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.tags shouldBe empty + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key + } + + "round trip a Context that with tags and entries" in { + val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) + .withKey(BinaryPropagationSpec.StringKey, "string-value") + .withKey(BinaryPropagationSpec.IntegerKey, 42) + + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + + rtContext.tags should contain theSameElementsAs (context.tags) + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key + } + } + + val binaryPropagation = BinaryPropagation.from( + ConfigFactory.parseString( + """ + |max-outgoing-size = 64 + |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + |entries.incoming.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec" + |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + |entries.outgoing.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec" + | + """.stripMargin + ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) + + + def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter + +} + +object BinaryPropagationSpec { + + val StringKey = Context.key[String]("string", null) + val FailStringKey = Context.key[String]("failString", null) + val IntegerKey = Context.key[Int]("integer", 0) + + class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { + + override def read(medium: ByteStreamReader, context: Context): Context = { + val valueData = medium.readAll() + + if(valueData.length > 0) { + context.withKey(StringKey, new String(valueData)) + } else context + } + + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val value = context.get(StringKey) + if(value != null) { + medium.write(value.getBytes) + } + } + } + + class FailStringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { + + override def read(medium: ByteStreamReader, context: Context): Context = { + val valueData = medium.readAll() + + if(valueData.length > 0) { + val stringValue = new String(valueData) + if(stringValue == "fail-read") { + sys.error("The fail string entry reader has triggered") + } + + context.withKey(FailStringKey, stringValue) + } else context + } + + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val value = context.get(FailStringKey) + if(value != null && value != "fail-write") { + medium.write(value.getBytes) + } else { + medium.write(42) // malformed data on purpose + sys.error("The fail string entry writer has triggered") + } + } + } +}
\ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala deleted file mode 100644 index f7bd7e56..00000000 --- a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import kamon.Kamon -import kamon.testkit.ContextTesting -import org.scalatest.{Matchers, OptionValues, WordSpec} - -class ContextCodecSpec extends WordSpec with Matchers with ContextTesting with OptionValues { - "the Context Codec" when { - "encoding/decoding to HttpHeaders" should { - "round trip a empty context" in { - val textMap = ContextCodec.HttpHeaders.encode(Context.Empty) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with only local keys" in { - val localOnlyContext = Context.create(StringKey, Some("string-value")) - val textMap = ContextCodec.HttpHeaders.encode(localOnlyContext) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with local and broadcast keys" in { - val initialContext = Context.create() - .withKey(StringKey, Some("string-value")) - .withKey(StringBroadcastKey, Some("this-should-be-round-tripped")) - - val textMap = ContextCodec.HttpHeaders.encode(initialContext) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext.get(StringKey) shouldBe empty - decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped" - } - - "read string broadcast keys using the configured header name" in { - val textMap = TextMap.Default() - textMap.put("X-Request-ID", "123456") - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext.get(Key.broadcastString("request-id")).value shouldBe "123456" - } - } - - "encoding/decoding to Binary" should { - "round trip a empty context" in { - val byteBuffer = ContextCodec.Binary.encode(Context.Empty) - - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with only local keys" in { - val localOnlyContext = Context.create(StringKey, Some("string-value")) - val byteBuffer = ContextCodec.Binary.encode(localOnlyContext) - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with local and broadcast keys" in { - val initialContext = Context.create() - .withKey(StringKey, Some("string-value")) - .withKey(StringBroadcastKey, Some("this-should-be-round-tripped")) - - val byteBuffer = ContextCodec.Binary.encode(initialContext) - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext.get(StringKey) shouldBe empty - decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped" - } - } - } - - val ContextCodec = new Codecs(Kamon.config()) -}
\ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala deleted file mode 100644 index f7e7599f..00000000 --- a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} - -import kamon.Kamon -import kamon.testkit.ContextTesting -import org.scalatest.{Matchers, OptionValues, WordSpec} - -class ContextSerializationSpec extends WordSpec with Matchers with ContextTesting with OptionValues { - "the Context is Serializable" should { - "empty " in { - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(Context.Empty) - - val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) - val ctx = ois.readObject().asInstanceOf[Context] - ctx shouldBe Context.Empty - } - - "full" in { - val sCtx = Context(StringBroadcastKey, Some("disi")) - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(sCtx) - - val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) - val rCtx = ois.readObject().asInstanceOf[Context] - rCtx shouldBe sCtx - } - - } - - val ContextCodec = new Codecs(Kamon.config()) -}
\ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala new file mode 100644 index 00000000..fcddfe24 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala @@ -0,0 +1,159 @@ +package kamon.context + +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} +import kamon.context.Propagation.{EntryReader, EntryWriter} +import org.scalatest.{Matchers, OptionValues, WordSpec} + +import scala.collection.mutable + +class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { + + "The HTTP Context Propagation" when { + "reading from incoming requests" should { + "return an empty context if there are no tags nor keys" in { + val context = httpPropagation.read(headerReaderFromMap(Map.empty)) + context.isEmpty() shouldBe true + } + + "read tags from an HTTP message when they are available" in { + val headers = Map( + "x-content-tags" -> "hello=world;correlation=1234", + "x-mapped-tag" -> "value" + ) + val context = httpPropagation.read(headerReaderFromMap(headers)) + context.tags should contain only( + "hello" -> "world", + "correlation" -> "1234", + "mappedTag" -> "value" + ) + } + + "handle errors when reading HTTP headers" in { + val headers = Map("fail" -> "") + val context = httpPropagation.read(headerReaderFromMap(headers)) + context.tags shouldBe empty + context.entries shouldBe empty + } + + "read context with entries and tags" in { + val headers = Map( + "x-content-tags" -> "hello=world;correlation=1234", + "string-header" -> "hey", + "integer-header" -> "123" + ) + + val context = httpPropagation.read(headerReaderFromMap(headers)) + context.get(HttpPropagationSpec.StringKey) shouldBe "hey" + context.get(HttpPropagationSpec.IntegerKey) shouldBe 123 + context.get(HttpPropagationSpec.OptionalKey) shouldBe empty + context.getTag("hello").value shouldBe "world" + context.getTag("correlation").value shouldBe "1234" + context.getTag("unknown") shouldBe empty + } + } + + + "writing to outgoing requests" should { + "not write anything if the context is empty" in { + val headers = mutable.Map.empty[String, String] + httpPropagation.write(Context.Empty, headerWriterFromMap(headers)) + headers shouldBe empty + } + + "write context tags when available" in { + val headers = mutable.Map.empty[String, String] + val context = Context.of(Map( + "hello" -> "world", + "mappedTag" -> "value" + )) + + httpPropagation.write(context, headerWriterFromMap(headers)) + headers should contain only( + "x-content-tags" -> "hello=world;", + "x-mapped-tag" -> "value" + ) + } + + "write context entries when available" in { + val headers = mutable.Map.empty[String, String] + val context = Context.of( + HttpPropagationSpec.StringKey, "out-we-go", + HttpPropagationSpec.IntegerKey, 42 + ) + + httpPropagation.write(context, headerWriterFromMap(headers)) + headers should contain only( + "string-header" -> "out-we-go" + ) + } + } + } + + + val httpPropagation = HttpPropagation.from( + ConfigFactory.parseString( + """ + |tags { + | header-name = "x-content-tags" + | + | mappings { + | mappedTag = "x-mapped-tag" + | } + |} + | + |entries.incoming.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" + |entries.incoming.integer = "kamon.context.HttpPropagationSpec$IntegerEntryCodec" + |entries.outgoing.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" + | + """.stripMargin + ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) + + + def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { + override def read(header: String): Option[String] = { + if(map.get("fail").nonEmpty) + sys.error("failing on purpose") + + map.get(header) + } + + override def readAll(): Map[String, String] = map + } + + def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { + override def write(header: String, value: String): Unit = map.put(header, value) + } +} + +object HttpPropagationSpec { + + val StringKey = Context.key[String]("string", null) + val IntegerKey = Context.key[Int]("integer", 0) + val OptionalKey = Context.key[Option[String]]("optional", None) + + + class StringEntryCodec extends EntryReader[HeaderReader] with EntryWriter[HeaderWriter] { + private val HeaderName = "string-header" + + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.read(HeaderName) + .map(v => context.withKey(StringKey, v)) + .getOrElse(context) + } + + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { + Option(context.get(StringKey)).foreach(v => writer.write(HeaderName, v)) + } + } + + class IntegerEntryCodec extends EntryReader[HeaderReader] { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.read("integer-header") + .map(v => context.withKey(IntegerKey, v.toInt)) + .getOrElse(context) + + } + } +}
\ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala index d2c4e57e..8b94ac0c 100644 --- a/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/context/ThreadLocalStorageSpec.scala @@ -48,8 +48,8 @@ class ThreadLocalStorageSpec extends WordSpec with Matchers { } val TLS: Storage = new Storage.ThreadLocal - val TestKey = Key.local("test-key", 42) - val AnotherKey = Key.local("another-key", 99) - val BroadcastKey = Key.broadcast("broadcast", "i travel around") - val ScopeWithKey = Context.create().withKey(TestKey, 43) + val TestKey = Context.key("test-key", 42) + val AnotherKey = Context.key("another-key", 99) + val BroadcastKey = Context.key("broadcast", "i travel around") + val ScopeWithKey = Context.of(TestKey, 43) } diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala new file mode 100644 index 00000000..c3c5f131 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala @@ -0,0 +1,316 @@ +package kamon.instrumentation + +import java.time.Duration + +import kamon.context.Context +import kamon.metric.{Counter, Histogram, RangeSampler} +import kamon.testkit.{MetricInspection, SpanInspection} +import org.scalatest.concurrent.Eventually +import org.scalatest.{Matchers, OptionValues, WordSpec} + +import scala.collection.mutable + +class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues with MetricInspection with Eventually { + + "the HTTP server instrumentation" when { + "configured for context propagation" should { + "read context entries and tags from the incoming request" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.context.tags should contain only( + "tag" -> "value", + "none" -> "0011223344556677" + ) + + handler.send(fakeResponse(200, mutable.Map.empty), Context.Empty) + handler.doneSending(0L) + } + + "use the configured HTTP propagation channel" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.context.tags should contain only( + "tag" -> "value", + "none" -> "0011223344556677" + ) + + val span = inspect(handler.span) + span.context().traceID.string shouldNot be("0011223344556677") + span.tag("http.method").value shouldBe "GET" + span.tag("http.url").value shouldBe "http://localhost:8080/" + + val responseHeaders = mutable.Map.empty[String, String] + handler.send(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world")) + handler.doneSending(0L) + } + } + + "configured for HTTP server metrics" should { + "track the number of open connections" in { + openConnections(8081).distribution() + + httpServer().openConnection() + httpServer().openConnection() + + val snapshotWithOpenConnections = openConnections(8081).distribution() + snapshotWithOpenConnections.min shouldBe 0 + snapshotWithOpenConnections.max shouldBe 2 + + httpServer().closeConnection(Duration.ofSeconds(20), 10) + httpServer().closeConnection(Duration.ofSeconds(30), 15) + + eventually { + val snapshotWithoutOpenConnections = openConnections(8081).distribution() + snapshotWithoutOpenConnections.min shouldBe 0 + snapshotWithoutOpenConnections.max shouldBe 0 + } + } + + "track the distribution of number of requests handled per each connection" in { + connectionUsage(8081).distribution() + + httpServer().openConnection() + httpServer().openConnection() + httpServer().closeConnection(Duration.ofSeconds(20), 10) + httpServer().closeConnection(Duration.ofSeconds(30), 15) + + val connectionUsageSnapshot = connectionUsage(8081).distribution() + connectionUsageSnapshot.buckets.map(_.value) should contain allOf( + 10, + 15 + ) + } + + "track the distribution of connection lifetime across all connections" in { + connectionLifetime(8081).distribution() + + httpServer().openConnection() + httpServer().openConnection() + httpServer().closeConnection(Duration.ofSeconds(20), 10) + httpServer().closeConnection(Duration.ofSeconds(30), 15) + + val connectionLifetimeSnapshot = connectionLifetime(8081).distribution() + connectionLifetimeSnapshot.buckets.map(_.value) should contain allOf( + 19998441472L, // 20 seconds with 1% precision + 29930553344L // 30 seconds with 1% precision + ) + } + + "track the number of active requests" in { + activeRequests(8081).distribution() + + val handlerOne = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + val handlerTwo = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + + val snapshotWithActiveRequests = activeRequests(8081).distribution() + snapshotWithActiveRequests.min shouldBe 0 + snapshotWithActiveRequests.max shouldBe 2 + + handlerOne.send(fakeResponse(200, mutable.Map.empty), Context.Empty) + handlerTwo.send(fakeResponse(200, mutable.Map.empty), Context.Empty) + handlerOne.doneSending(0L) + handlerTwo.doneSending(0L) + + eventually { + val snapshotWithoutActiveRequests = activeRequests(8081).distribution() + snapshotWithoutActiveRequests.min shouldBe 0 + snapshotWithoutActiveRequests.max shouldBe 0 + } + } + + "track the distribution of sizes on incoming requests" in { + requestSize(8081).distribution() + + httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneReceiving(300) + httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneReceiving(400) + + val requestSizeSnapshot = requestSize(8081).distribution() + requestSizeSnapshot.buckets.map(_.value) should contain allOf( + 300, + 400 + ) + } + + "track the distribution of sizes on outgoing responses" in { + responseSize(8081).distribution() + + httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneSending(300) + httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneSending(400) + + val requestSizeSnapshot = responseSize(8081).distribution() + requestSizeSnapshot.buckets.map(_.value) should contain allOf( + 300, + 400 + ) + } + + "track the number of responses per status code" in { + // resets all counters + completedRequests(8081, 100).value() + completedRequests(8081, 200).value() + completedRequests(8081, 300).value() + completedRequests(8081, 400).value() + completedRequests(8081, 500).value() + + + val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty) + httpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty) + httpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty) + httpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty) + httpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty) + httpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty) + + completedRequests(8081, 100).value() shouldBe 1L + completedRequests(8081, 200).value() shouldBe 1L + completedRequests(8081, 300).value() shouldBe 1L + completedRequests(8081, 400).value() shouldBe 1L + completedRequests(8081, 500).value() shouldBe 1L + } + } + + "configured for distributed tracing" should { + "create a span representing the current HTTP operation" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + handler.send(fakeResponse(200, mutable.Map.empty), handler.context) + + val span = inspect(handler.span) + span.tag("http.method").value shouldBe "GET" + span.tag("http.url").value shouldBe "http://localhost:8080/" + span.tag("http.status_code").value shouldBe "200" + } + + "adopt a traceID when explicitly provided" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "x-correlation-id" -> "0011223344556677" + ))) + + handler.span.context().traceID.string shouldBe "0011223344556677" + } + + "record span metrics when enabled" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + handler.send(fakeResponse(200, mutable.Map.empty), handler.context) + + val span = inspect(handler.span) + span.hasMetricsEnabled() shouldBe true + } + + "not record span metrics when disabled" in { + val handler = noSpanMetricsHttpServer() + .receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + handler.send(fakeResponse(200, mutable.Map.empty), handler.context) + + val span = inspect(handler.span) + span.hasMetricsEnabled() shouldBe false + } + + "receive tags from context when available" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;peer=superservice;", + "custom-trace-id" -> "0011223344556677" + ))) + + val span = inspect(handler.span) + span.tag("peer").value shouldBe "superservice" + } + } + + "all capabilities are disabled" should { + "not read any context from the incoming requests" in { + val httpServer = noopHttpServer() + val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.context shouldBe Context.Empty + } + + "not create any span to represent the server request" in { + val httpServer = noopHttpServer() + val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.span.isEmpty() shouldBe true + } + + "not record any HTTP server metrics" in { + val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty) + noopHttpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty) + + completedRequests(8083, 100).value() shouldBe 0L + completedRequests(8083, 200).value() shouldBe 0L + completedRequests(8083, 300).value() shouldBe 0L + completedRequests(8083, 400).value() shouldBe 0L + completedRequests(8083, 500).value() shouldBe 0L + } + } + } + + val TestComponent = "http.server" + val TestInterface = "0.0.0.0" + + def httpServer(): HttpServer = HttpServer.from("default", component = TestComponent, interface = TestInterface, port = 8081) + def noSpanMetricsHttpServer(): HttpServer = HttpServer.from("no-span-metrics", component = TestComponent, interface = TestInterface, port = 8082) + def noopHttpServer(): HttpServer = HttpServer.from("noop", component = TestComponent, interface = TestInterface, port = 8083) + + def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpMessage.Request = + new HttpMessage.Request { + override def url: String = requestUrl + override def path: String = requestPath + override def method: String = requestMethod + override def read(header: String): Option[String] = headers.get(header) + override def readAll(): Map[String, String] = headers + } + + def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpMessage.ResponseBuilder[HttpMessage.Response] = + new HttpMessage.ResponseBuilder[HttpMessage.Response] { + override def statusCode: Int = responseStatusCode + override def write(header: String, value: String): Unit = headers.put(header, value) + override def build(): HttpMessage.Response = this + } + + def completedRequests(port: Int, statusCode: Int): Counter = { + val metrics = HttpServer.Metrics.of(TestComponent, TestInterface, port) + + statusCode match { + case sc if sc >= 100 && sc <= 199 => metrics.requestsInformational + case sc if sc >= 200 && sc <= 299 => metrics.requestsSuccessful + case sc if sc >= 300 && sc <= 399 => metrics.requestsRedirection + case sc if sc >= 400 && sc <= 499 => metrics.requestsClientError + case sc if sc >= 500 && sc <= 599 => metrics.requestsServerError + } + } + + def openConnections(port: Int): RangeSampler = + HttpServer.Metrics.of(TestComponent, TestInterface, port).openConnections + + def connectionUsage(port: Int): Histogram = + HttpServer.Metrics.of(TestComponent, TestInterface, port).connectionUsage + + def connectionLifetime(port: Int): Histogram = + HttpServer.Metrics.of(TestComponent, TestInterface, port).connectionLifetime + + def activeRequests(port: Int): RangeSampler = + HttpServer.Metrics.of(TestComponent, TestInterface, port).activeRequests + + def requestSize(port: Int): Histogram = + HttpServer.Metrics.of(TestComponent, TestInterface, port).requestSize + + def responseSize(port: Int): Histogram = + HttpServer.Metrics.of(TestComponent, TestInterface, port).responseSize + +} diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala deleted file mode 100644 index f718d806..00000000 --- a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala +++ /dev/null @@ -1,208 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.trace - -import kamon.context.{Context, TextMap} -import kamon.testkit.SpanBuilding -import kamon.trace.IdentityProvider.Identifier -import kamon.trace.SpanContext.SamplingDecision -import org.scalatest.{Matchers, OptionValues, WordSpecLike} - - -class B3SpanCodecSpec extends WordSpecLike with Matchers with OptionValues with SpanBuilding { - val extendedB3Codec = SpanCodec.B3() - - "The ExtendedB3 SpanContextCodec" should { - "return a TextMap containing the SpanContext data" in { - val textMap = extendedB3Codec.encode(testContext()) - textMap.get("X-B3-TraceId").value shouldBe "1234" - textMap.get("X-B3-ParentSpanId").value shouldBe "2222" - textMap.get("X-B3-SpanId").value shouldBe "4321" - textMap.get("X-B3-Sampled").value shouldBe "1" - } - - "do not include the X-B3-ParentSpanId if there is no parent" in { - val textMap = extendedB3Codec.encode(testContextWithoutParent()) - textMap.get("X-B3-TraceId").value shouldBe "1234" - textMap.get("X-B3-ParentSpanId") shouldBe empty - textMap.get("X-B3-SpanId").value shouldBe "4321" - textMap.get("X-B3-Sampled").value shouldBe "1" - } - - - "not inject anything if there is no Span in the Context" in { - val textMap = extendedB3Codec.encode(Context.Empty) - textMap.values shouldBe empty - } - - "extract a RemoteSpan from a TextMap when all fields are set" in { - val textMap = TextMap.Default() - textMap.put("X-B3-TraceId", "1234") - textMap.put("X-B3-ParentSpanId", "2222") - textMap.put("X-B3-SpanId", "4321") - textMap.put("X-B3-Sampled", "1") - textMap.put("X-B3-Extra-Baggage", "some=baggage;more=baggage") - - val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context() - spanContext.traceID.string shouldBe "1234" - spanContext.spanID.string shouldBe "4321" - spanContext.parentID.string shouldBe "2222" - spanContext.samplingDecision shouldBe SamplingDecision.Sample - } - - "decode the sampling decision based on the X-B3-Sampled header" in { - val sampledTextMap = TextMap.Default() - sampledTextMap.put("X-B3-TraceId", "1234") - sampledTextMap.put("X-B3-SpanId", "4321") - sampledTextMap.put("X-B3-Sampled", "1") - - val notSampledTextMap = TextMap.Default() - notSampledTextMap.put("X-B3-TraceId", "1234") - notSampledTextMap.put("X-B3-SpanId", "4321") - notSampledTextMap.put("X-B3-Sampled", "0") - - val noSamplingTextMap = TextMap.Default() - noSamplingTextMap.put("X-B3-TraceId", "1234") - noSamplingTextMap.put("X-B3-SpanId", "4321") - - extendedB3Codec.decode(sampledTextMap, Context.Empty) - .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Sample - - extendedB3Codec.decode(notSampledTextMap, Context.Empty) - .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.DoNotSample - - extendedB3Codec.decode(noSamplingTextMap, Context.Empty) - .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Unknown - } - - "not include the X-B3-Sampled header if the sampling decision is unknown" in { - val context = testContext() - val sampledSpanContext = context.get(Span.ContextKey).context() - val notSampledSpanContext = Context.Empty.withKey(Span.ContextKey, - Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.DoNotSample))) - val unknownSamplingSpanContext = Context.Empty.withKey(Span.ContextKey, - Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.Unknown))) - - extendedB3Codec.encode(context).get("X-B3-Sampled").value shouldBe("1") - extendedB3Codec.encode(notSampledSpanContext).get("X-B3-Sampled").value shouldBe("0") - extendedB3Codec.encode(unknownSamplingSpanContext).get("X-B3-Sampled") shouldBe empty - } - - "use the Debug flag to override the sampling decision, if provided." in { - val textMap = TextMap.Default() - textMap.put("X-B3-TraceId", "1234") - textMap.put("X-B3-SpanId", "4321") - textMap.put("X-B3-Sampled", "0") - textMap.put("X-B3-Flags", "1") - - val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context() - spanContext.samplingDecision shouldBe SamplingDecision.Sample - } - - "use the Debug flag as sampling decision when Sampled is not provided" in { - val textMap = TextMap.Default() - textMap.put("X-B3-TraceId", "1234") - textMap.put("X-B3-SpanId", "4321") - textMap.put("X-B3-Flags", "1") - - val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context() - spanContext.samplingDecision shouldBe SamplingDecision.Sample - } - - "extract a minimal SpanContext from a TextMap containing only the Trace ID and Span ID" in { - val textMap = TextMap.Default() - textMap.put("X-B3-TraceId", "1234") - textMap.put("X-B3-SpanId", "4321") - - val spanContext = extendedB3Codec.decode(textMap, Context.Empty).get(Span.ContextKey).context() - spanContext.traceID.string shouldBe "1234" - spanContext.spanID.string shouldBe "4321" - spanContext.parentID shouldBe IdentityProvider.NoIdentifier - spanContext.samplingDecision shouldBe SamplingDecision.Unknown - } - - "do not extract a SpanContext if Trace ID and Span ID are not provided" in { - val onlyTraceID = TextMap.Default() - onlyTraceID.put("X-B3-TraceId", "1234") - onlyTraceID.put("X-B3-Sampled", "0") - onlyTraceID.put("X-B3-Flags", "1") - - val onlySpanID = TextMap.Default() - onlySpanID.put("X-B3-SpanId", "4321") - onlySpanID.put("X-B3-Sampled", "0") - onlySpanID.put("X-B3-Flags", "1") - - val noIds = TextMap.Default() - noIds.put("X-B3-Sampled", "0") - noIds.put("X-B3-Flags", "1") - - extendedB3Codec.decode(onlyTraceID, Context.Empty).get(Span.ContextKey) shouldBe Span.Empty - extendedB3Codec.decode(onlySpanID, Context.Empty).get(Span.ContextKey) shouldBe Span.Empty - extendedB3Codec.decode(noIds, Context.Empty).get(Span.ContextKey) shouldBe Span.Empty - } - - "round trip a Span from TextMap -> Context -> TextMap" in { - val textMap = TextMap.Default() - textMap.put("X-B3-TraceId", "1234") - textMap.put("X-B3-ParentSpanId", "2222") - textMap.put("X-B3-SpanId", "4321") - textMap.put("X-B3-Sampled", "1") - - val context = extendedB3Codec.decode(textMap, Context.Empty) - val injectTextMap = extendedB3Codec.encode(context) - - textMap.values.toSeq should contain theSameElementsAs(injectTextMap.values.toSeq) - } - - /* - // TODO: Should we be supporting this use case? maybe even have the concept of Debug requests ourselves? - "internally carry the X-B3-Flags value so that it can be injected in outgoing requests" in { - val textMap = TextMap.Default() - textMap.put("X-B3-TraceId", "1234") - textMap.put("X-B3-ParentSpanId", "2222") - textMap.put("X-B3-SpanId", "4321") - textMap.put("X-B3-Sampled", "1") - textMap.put("X-B3-Flags", "1") - - val spanContext = extendedB3Codec.extract(textMap).value - val injectTextMap = extendedB3Codec.inject(spanContext) - - injectTextMap.get("X-B3-Flags").value shouldBe("1") - }*/ - } - - def testContext(): Context = { - val spanContext = createSpanContext().copy( - traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), - spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), - parentID = Identifier("2222", Array[Byte](2, 2, 2, 2)) - ) - - Context.create().withKey(Span.ContextKey, Span.Remote(spanContext)) - } - - def testContextWithoutParent(): Context = { - val spanContext = createSpanContext().copy( - traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), - spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), - parentID = IdentityProvider.NoIdentifier - ) - - Context.create().withKey(Span.ContextKey, Span.Remote(spanContext)) - } - -}
\ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala new file mode 100644 index 00000000..5b912b92 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanPropagationSpec.scala @@ -0,0 +1,232 @@ +/* + * ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import kamon.context.{Context, HttpPropagation} +import kamon.testkit.SpanBuilding +import kamon.trace.IdentityProvider.Identifier +import kamon.trace.SpanContext.SamplingDecision +import org.scalatest.{Matchers, OptionValues, WordSpecLike} + +import scala.collection.mutable + + +class B3SpanPropagationSpec extends WordSpecLike with Matchers with OptionValues with SpanBuilding { + val b3Propagation = SpanPropagation.B3() + + "The B3 Span propagation for HTTP" should { + "write the Span data into headers" in { + val headersMap = mutable.Map.empty[String, String] + b3Propagation.write(testContext(), headerWriterFromMap(headersMap)) + + headersMap.get("X-B3-TraceId").value shouldBe "1234" + headersMap.get("X-B3-ParentSpanId").value shouldBe "2222" + headersMap.get("X-B3-SpanId").value shouldBe "4321" + headersMap.get("X-B3-Sampled").value shouldBe "1" + } + + "do not include the X-B3-ParentSpanId if there is no parent" in { + val headersMap = mutable.Map.empty[String, String] + b3Propagation.write(testContextWithoutParent(), headerWriterFromMap(headersMap)) + + headersMap.get("X-B3-TraceId").value shouldBe "1234" + headersMap.get("X-B3-ParentSpanId") shouldBe empty + headersMap.get("X-B3-SpanId").value shouldBe "4321" + headersMap.get("X-B3-Sampled").value shouldBe "1" + } + + "not inject anything if there is no Span in the Context" in { + val headersMap = mutable.Map.empty[String, String] + b3Propagation.write(Context.Empty, headerWriterFromMap(headersMap)) + headersMap.values shouldBe empty + } + + "extract a RemoteSpan from incoming headers when all fields are set" in { + val headersMap = Map( + "X-B3-TraceId" -> "1234", + "X-B3-ParentSpanId" -> "2222", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "1", + "X-B3-Extra-Baggage" -> "some=baggage;more=baggage" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headersMap), Context.Empty).get(Span.ContextKey).context() + spanContext.traceID.string shouldBe "1234" + spanContext.spanID.string shouldBe "4321" + spanContext.parentID.string shouldBe "2222" + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "decode the sampling decision based on the X-B3-Sampled header" in { + val sampledHeaders = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "1" + ) + + val notSampledHeaders = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "0" + ) + + val noSamplingHeaders = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321" + ) + + b3Propagation.read(headerReaderFromMap(sampledHeaders), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Sample + + b3Propagation.read(headerReaderFromMap(notSampledHeaders), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.DoNotSample + + b3Propagation.read(headerReaderFromMap(noSamplingHeaders), Context.Empty) + .get(Span.ContextKey).context().samplingDecision shouldBe SamplingDecision.Unknown + } + + "not include the X-B3-Sampled header if the sampling decision is unknown" in { + val context = testContext() + val sampledSpanContext = context.get(Span.ContextKey).context() + val notSampledSpanContext = Context.Empty.withKey(Span.ContextKey, + Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.DoNotSample))) + val unknownSamplingSpanContext = Context.Empty.withKey(Span.ContextKey, + Span.Remote(sampledSpanContext.copy(samplingDecision = SamplingDecision.Unknown))) + val headersMap = mutable.Map.empty[String, String] + + b3Propagation.write(context, headerWriterFromMap(headersMap)) + headersMap.get("X-B3-Sampled").value shouldBe("1") + headersMap.clear() + + b3Propagation.write(notSampledSpanContext, headerWriterFromMap(headersMap)) + headersMap.get("X-B3-Sampled").value shouldBe("0") + headersMap.clear() + + b3Propagation.write(unknownSamplingSpanContext, headerWriterFromMap(headersMap)) + headersMap.get("X-B3-Sampled") shouldBe empty + } + + "use the Debug flag to override the sampling decision, if provided." in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "use the Debug flag as sampling decision when Sampled is not provided" in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-Flags" -> "1" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.samplingDecision shouldBe SamplingDecision.Sample + } + + "extract a minimal SpanContext from a TextMap containing only the Trace ID and Span ID" in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321" + ) + + val spanContext = b3Propagation.read(headerReaderFromMap(headers), Context.Empty).get(Span.ContextKey).context() + spanContext.traceID.string shouldBe "1234" + spanContext.spanID.string shouldBe "4321" + spanContext.parentID shouldBe IdentityProvider.NoIdentifier + spanContext.samplingDecision shouldBe SamplingDecision.Unknown + } + + "do not extract a SpanContext if Trace ID and Span ID are not provided" in { + val onlyTraceID = Map( + "X-B3-TraceId" -> "1234", + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + val onlySpanID = Map( + "X-B3-SpanId" -> "1234", + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + val noIds = Map( + "X-B3-Sampled" -> "0", + "X-B3-Flags" -> "1" + ) + + b3Propagation.read(headerReaderFromMap(onlyTraceID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + b3Propagation.read(headerReaderFromMap(onlySpanID), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + b3Propagation.read(headerReaderFromMap(noIds), Context.Empty).get(Span.ContextKey) shouldBe Span.Empty + } + + "round trip a Span from TextMap -> Context -> TextMap" in { + val headers = Map( + "X-B3-TraceId" -> "1234", + "X-B3-SpanId" -> "4321", + "X-B3-ParentSpanId" -> "2222", + "X-B3-Sampled" -> "1" + ) + + val writenHeaders = mutable.Map.empty[String, String] + val context = b3Propagation.read(headerReaderFromMap(headers), Context.Empty) + b3Propagation.write(context, headerWriterFromMap(writenHeaders)) + writenHeaders should contain theSameElementsAs(headers) + } + } + + def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { + override def read(header: String): Option[String] = { + if(map.get("fail").nonEmpty) + sys.error("failing on purpose") + + map.get(header) + } + + override def readAll(): Map[String, String] = map + } + + def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { + override def write(header: String, value: String): Unit = map.put(header, value) + } + + def testContext(): Context = { + val spanContext = createSpanContext().copy( + traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), + spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), + parentID = Identifier("2222", Array[Byte](2, 2, 2, 2)) + ) + + Context.of(Span.ContextKey, Span.Remote(spanContext)) + } + + def testContextWithoutParent(): Context = { + val spanContext = createSpanContext().copy( + traceID = Identifier("1234", Array[Byte](1, 2, 3, 4)), + spanID = Identifier("4321", Array[Byte](4, 3, 2, 1)), + parentID = IdentityProvider.NoIdentifier + ) + + Context.of(Span.ContextKey, Span.Remote(spanContext)) + } + +}
\ No newline at end of file diff --git a/kamon-core/src/main/colfer/Context.colf b/kamon-core/src/main/colfer/Context.colf index f84d7a56..f5d9a80b 100644 --- a/kamon-core/src/main/colfer/Context.colf +++ b/kamon-core/src/main/colfer/Context.colf @@ -6,5 +6,6 @@ type Entry struct { } type Context struct { + tags []text entries []Entry }
\ No newline at end of file diff --git a/kamon-core/src/main/colfer/building.md b/kamon-core/src/main/colfer/building.md index f510a44f..8b663828 100644 --- a/kamon-core/src/main/colfer/building.md +++ b/kamon-core/src/main/colfer/building.md @@ -1,4 +1,4 @@ -Just download and install the colver compiler and run this command from the colfer folder: +Just download and install the colfer compiler and run this command from the colfer folder: ``` colfer -b ../java -p kamon/context/generated/binary java diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java index a9917e99..4be6d630 100644 --- a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java +++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java @@ -12,6 +12,7 @@ import java.io.ObjectOutputStream; import java.io.ObjectStreamException; import java.io.OutputStream; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.InputMismatchException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; @@ -35,6 +36,8 @@ public class Context implements Serializable { + public String[] tags; + public Entry[] entries; @@ -43,10 +46,12 @@ public class Context implements Serializable { init(); } + private static final String[] _zeroTags = new String[0]; private static final Entry[] _zeroEntries = new Entry[0]; /** Colfer zero values. */ private void init() { + tags = _zeroTags; entries = _zeroEntries; } @@ -142,6 +147,7 @@ public class Context implements Serializable { /** * Serializes the object. + * All {@code null} elements in {@link #tags} will be replaced with {@code ""}. * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. * @param out the data destination. * @param buf the initial buffer or {@code null}. @@ -171,6 +177,7 @@ public class Context implements Serializable { /** * Serializes the object. + * All {@code null} elements in {@link #tags} will be replaced with {@code ""}. * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. * @param buf the data destination. * @param offset the initial index for {@code buf}, inclusive. @@ -182,8 +189,72 @@ public class Context implements Serializable { int i = offset; try { - if (this.entries.length != 0) { + if (this.tags.length != 0) { buf[i++] = (byte) 0; + String[] a = this.tags; + + int x = a.length; + if (x > Context.colferListMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", x, Context.colferListMax)); + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + for (int ai = 0; ai < a.length; ai++) { + String s = a[ai]; + if (s == null) { + s = ""; + a[ai] = s; + } + + int start = ++i; + + for (int sIndex = 0, sLength = s.length(); sIndex < sLength; sIndex++) { + char c = s.charAt(sIndex); + if (c < '\u0080') { + buf[i++] = (byte) c; + } else if (c < '\u0800') { + buf[i++] = (byte) (192 | c >>> 6); + buf[i++] = (byte) (128 | c & 63); + } else if (c < '\ud800' || c > '\udfff') { + buf[i++] = (byte) (224 | c >>> 12); + buf[i++] = (byte) (128 | c >>> 6 & 63); + buf[i++] = (byte) (128 | c & 63); + } else { + int cp = 0; + if (++sIndex < sLength) cp = Character.toCodePoint(c, s.charAt(sIndex)); + if ((cp >= 1 << 16) && (cp < 1 << 21)) { + buf[i++] = (byte) (240 | cp >>> 18); + buf[i++] = (byte) (128 | cp >>> 12 & 63); + buf[i++] = (byte) (128 | cp >>> 6 & 63); + buf[i++] = (byte) (128 | cp & 63); + } else + buf[i++] = (byte) '?'; + } + } + int size = i - start; + if (size > Context.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax)); + + int ii = start - 1; + if (size > 0x7f) { + i++; + for (int y = size; y >= 1 << 14; y >>>= 7) i++; + System.arraycopy(buf, start, buf, i - size, size); + + do { + buf[ii++] = (byte) (size | 0x80); + size >>>= 7; + } while (size > 0x7f); + } + buf[ii] = (byte) size; + } + } + + if (this.entries.length != 0) { + buf[i++] = (byte) 1; Entry[] a = this.entries; int x = a.length; @@ -253,6 +324,35 @@ public class Context implements Serializable { if (shift == 28 || b >= 0) break; } if (length < 0 || length > Context.colferListMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags length %d exceeds %d elements", length, Context.colferListMax)); + + String[] a = new String[length]; + for (int ai = 0; ai < length; ai++) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Context.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.tags[%d] size %d exceeds %d UTF-8 bytes", ai, size, Context.colferSizeMax)); + + int start = i; + i += size; + a[ai] = new String(buf, start, size, StandardCharsets.UTF_8); + } + this.tags = a; + header = buf[i++]; + } + + if (header == (byte) 1) { + int length = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + length |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (length < 0 || length > Context.colferListMax) throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", length, Context.colferListMax)); Entry[] a = new Entry[length]; @@ -278,7 +378,7 @@ public class Context implements Serializable { } // {@link Serializable} version number. - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { @@ -312,6 +412,32 @@ public class Context implements Serializable { } /** + * Gets kamon/context/generated/binary/context.Context.tags. + * @return the value. + */ + public String[] getTags() { + return this.tags; + } + + /** + * Sets kamon/context/generated/binary/context.Context.tags. + * @param value the replacement. + */ + public void setTags(String[] value) { + this.tags = value; + } + + /** + * Sets kamon/context/generated/binary/context.Context.tags. + * @param value the replacement. + * @return {link this}. + */ + public Context withTags(String[] value) { + this.tags = value; + return this; + } + + /** * Gets kamon/context/generated/binary/context.Context.entries. * @return the value. */ @@ -340,6 +466,7 @@ public class Context implements Serializable { @Override public final int hashCode() { int h = 1; + for (String o : this.tags) h = 31 * h + (o == null ? 0 : o.hashCode()); for (Entry o : this.entries) h = 31 * h + (o == null ? 0 : o.hashCode()); return h; } @@ -353,6 +480,7 @@ public class Context implements Serializable { if (o == null) return false; if (o == this) return true; return o.getClass() == Context.class + && java.util.Arrays.equals(this.tags, o.tags) && java.util.Arrays.equals(this.entries, o.entries); } diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 60fa156d..a35225d4 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -150,45 +150,210 @@ kamon { } } + propagation { - context { + http { - # Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or - # Binary transports. Only broadcast keys configured bellow will be processed by the context Codec. The FQCN of - # the appropriate Codecs for each key must be provided, otherwise keys will be ignored. - # - codecs { - - # Size of the encoding buffer for the Binary Codec. - binary-buffer-size = 256 - - # Declarative definition of broadcast context keys with type Option[String]. The setting key represents the actual - # key name and the value is the HTTP header name to be used to encode/decode the context key. The key name will - # be used when coding for binary transport. The most common use case for string keys is effortless propagation of - # correlation keys or request related data (locale, user ID, etc). E.g. if wanting to propagate a "X-Request-ID" - # header this config should suffice: + # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on + # this section for HTTP context propagation. # - # kamon.context.codecs.string-keys { - # request-id = "X-Request-ID" - # } - # - # If the application must read this context key they can define key with a matching name and read the value from - # the context: - # val requestIDKey = Key.broadcastString("request-id") // Do this only once, keep a reference. - # val requestID = Kamon.currentContext().get(requestIDKey) - # - string-keys { + default { + + # Configures how context tags will be propagated over HTTP headers. + # + tags { + + # Header name used to encode context tags. + header-name = "context-tags" + + + # Provide explicit mappins between context tags and the HTTP headers that will carry them. When there is + # an explicit mapping for a tag, it will not be included in the default context header. For example, if + # you wanted to use the an HTTP header called `X-Correlation-ID` for a context tag with key `correlationID` + # you would need to include a the following configuration: + # + # mappings { + # correlationID = "X-Correlation-ID" + # } + # + # The correlationID tag would always be read and written from the `X-Correlation-ID` header. The context + # tag name is represented as the configuration key and the desired header name is represented by the + # cofiguration value. + # + mappings { + + } + } + # Configure which entries should be read from incoming HTTP requests and writen to outgoing HTTP requests. + # + entries { + + # Specify mappings between Context keys and the Propagation.EntryReader[HeaderReader] implementation in charge + # of reading them from the incoming HTTP request into the Context. + incoming { + span = "kamon.trace.SpanPropagation$B3" + } + + # Specify mappings betwen Context keys and the Propagation.EntryWriter[HeaderWriter] implementation in charge + # of writing them to outgoing HTTP requests. + outgoing { + span = "kamon.trace.SpanPropagation$B3" + } + } } + } + + binary { - # Codecs to be used when propagating a Context through a HTTP Headers transport. - http-headers-keys { - span = "kamon.trace.SpanCodec$B3" + # Default HTTP propagation. Unless specified otherwise, all instrumentation will use the configuration on + # this section for HTTP context propagation. + # + default { + + # Maximum outgoing Context size for binary transports. Contexts that surpass this limit will not be written to + # the outgoing medium. + max-outgoing-size = 2048 + + # Configure which entries should be read from incoming messages and writen to outgoing messages. + # + entries { + + # Specify mappings between Context keys and the Propagation.EntryReader[ByteStreamReader] implementation in + # charge of reading them from the incoming messages into the Context. + incoming { + span = "kamon.trace.SpanPropagation$Colfer" + } + + # Specify mappings betwen Context keys and the Propagation.EntryWriter[ByteStreamWriter] implementation in + # charge of writing them on the outgoing messages. + outgoing { + span = "kamon.trace.SpanPropagation$Colfer" + } + } } + } + } + + instrumentation { + http-server { + default { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + } - # Codecs to be used when propagating a Context through a Binary transport. - binary-keys { - span = "kamon.trace.SpanCodec$Colfer" + + # + # Configuration for HTTP server metrics collection. + # + metrics { + + # Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming + # that the instrumentation is fully compliant: + # + # - http.server.requets + # - http.server.request.active + # - http.server.request.size + # - http.server.response.size + # - http.server.connection.lifetime + # - http.server.connection.usage + # - http.server.connection.open + # + # All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests + # metric will also have a status_code tag with the status code group (1xx, 2xx and so on). + # + enabled = yes + } + + + # + # Configuration for HTTP request tracing. + # + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests + # and finish them when the response is sent back to the clients. + enabled = yes + + # Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used + # only if all these conditions are met: + # - the context tag is present. + # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). + # - the identifier is valid in accordance to the identity provider. + preferred-trace-id-tag = "none" + + # Enables collection of span metrics using the `span.processing-time` metric. + span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + url = span + + # Use the http.method tag. + method = metric + + # Use the http.status_code tag. + status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + # Custom mappings between routes and operation names. + operations { + + # Operation name for Spans created on requests that could not be handled by any route in the current + # application. + unhandled = "unhandled" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } + } + } } } } diff --git a/kamon-core/src/main/scala/kamon/ClassLoading.scala b/kamon-core/src/main/scala/kamon/ClassLoading.scala new file mode 100644 index 00000000..5b097af1 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ClassLoading.scala @@ -0,0 +1,26 @@ +package kamon + +import kamon.util.DynamicAccess + +import scala.collection.immutable +import scala.reflect.ClassTag +import scala.util.Try + +/** + * Utilities for creating instances from fully qualified class names. + */ +trait ClassLoading { + @volatile private var _dynamicAccessClassLoader = this.getClass.getClassLoader + @volatile private var _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader) + + def classLoader(): ClassLoader = + _dynamicAccessClassLoader + + def changeClassLoader(classLoader: ClassLoader): Unit = synchronized { + _dynamicAccessClassLoader = classLoader + _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader) + } + + def createInstance[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] = + _dynamicAccess.createInstanceFor(fqcn, args) +} diff --git a/kamon-core/src/main/scala/kamon/Configuration.scala b/kamon-core/src/main/scala/kamon/Configuration.scala new file mode 100644 index 00000000..bd286792 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Configuration.scala @@ -0,0 +1,57 @@ +package kamon + +import scala.util.Try +import com.typesafe.config.{Config, ConfigFactory} +import org.slf4j.LoggerFactory + +trait Configuration { self: ClassLoading => + private val logger = LoggerFactory.getLogger(classOf[Configuration]) + private var _currentConfig: Config = ConfigFactory.load(self.classLoader()) + private var _onReconfigureHooks = Seq.empty[Configuration.OnReconfigureHook] + + + /** + * Retrieve Kamon's current configuration. + */ + def config(): Config = + _currentConfig + + /** + * Supply a new Config instance to rule Kamon's world. + */ + def reconfigure(newConfig: Config): Unit = synchronized { + _currentConfig = newConfig + _onReconfigureHooks.foreach(hook => { + Try(hook.onReconfigure(newConfig)).failed.foreach(error => + logger.error("Exception occurred while trying to run a OnReconfigureHook", error) + ) + }) + } + + /** + * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All + * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config). + */ + def onReconfigure(hook: Configuration.OnReconfigureHook): Unit = synchronized { + _onReconfigureHooks = hook +: _onReconfigureHooks + } + + /** + * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All + * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config). + */ + def onReconfigure(hook: (Config) => Unit): Unit = { + onReconfigure(new Configuration.OnReconfigureHook { + override def onReconfigure(newConfig: Config): Unit = hook.apply(newConfig) + }) + } + +} + +object Configuration { + + trait OnReconfigureHook { + def onReconfigure(newConfig: Config): Unit + } + +} diff --git a/kamon-core/src/main/scala/kamon/ContextPropagation.scala b/kamon-core/src/main/scala/kamon/ContextPropagation.scala new file mode 100644 index 00000000..8f2ed8e4 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ContextPropagation.scala @@ -0,0 +1,92 @@ +package kamon + +import com.typesafe.config.Config +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} +import kamon.context.{BinaryPropagation, HttpPropagation, Propagation} + +trait ContextPropagation { self: Configuration with ClassLoading => + @volatile private var _propagationComponents: ContextPropagation.Components = _ + @volatile private var _defaultHttpPropagation: Propagation[HeaderReader, HeaderWriter] = _ + @volatile private var _defaultBinaryPropagation: Propagation[ByteStreamReader, ByteStreamWriter] = _ + + // Initial configuration and reconfigures + init(self.config) + self.onReconfigure(newConfig => self.init(newConfig)) + + + /** + * Retrieves the HTTP propagation channel with the supplied name. Propagation channels are configured on the + * kamon.propagation.http configuration section. + * + * @param channelName Channel name to retrieve. + * @return The HTTP propagation, if available. + */ + def httpPropagation(channelName: String): Option[Propagation[HeaderReader, HeaderWriter]] = + _propagationComponents.httpChannels.get(channelName) + + /** + * Retrieves the binary propagation channel with the supplied name. Propagation channels are configured on the + * kamon.propagation.binary configuration section. + * + * @param channelName Channel name to retrieve. + * @return The binary propagation, if available. + */ + def binaryPropagation(channelName: String): Option[Propagation[ByteStreamReader, ByteStreamWriter]] = + _propagationComponents.binaryChannels.get(channelName) + + /** + * Retrieves the default HTTP propagation channel. Configuration for this channel can be found under the + * kamon.propagation.http.default configuration section. + * + * @return The default HTTP propagation. + */ + def defaultHttpPropagation(): Propagation[HeaderReader, HeaderWriter] = + _defaultHttpPropagation + + /** + * Retrieves the default binary propagation channel. Configuration for this channel can be found under the + * kamon.propagation.binary.default configuration section. + * + * @return The default HTTP propagation. + */ + def defaultBinaryPropagation(): Propagation[ByteStreamReader, ByteStreamWriter] = + _defaultBinaryPropagation + + + + private def init(config: Config): Unit = synchronized { + _propagationComponents = ContextPropagation.Components.from(self.config, self) + _defaultHttpPropagation = _propagationComponents.httpChannels(ContextPropagation.DefaultHttpChannel) + _defaultBinaryPropagation = _propagationComponents.binaryChannels(ContextPropagation.DefaultBinaryChannel) + } +} + +object ContextPropagation { + val DefaultHttpChannel = "default" + val DefaultBinaryChannel = "default" + + case class Components( + httpChannels: Map[String, Propagation[HeaderReader, HeaderWriter]], + binaryChannels: Map[String, Propagation[ByteStreamReader, ByteStreamWriter]] + ) + + object Components { + + def from(config: Config, classLoading: ClassLoading): Components = { + val propagationConfig = config.getConfig("kamon.propagation") + val httpChannelsConfig = propagationConfig.getConfig("http").configurations + val binaryChannelsConfig = propagationConfig.getConfig("binary").configurations + + val httpChannels = httpChannelsConfig.map { + case (channelName, channelConfig) => (channelName -> HttpPropagation.from(channelConfig, classLoading)) + } + + val binaryChannels = binaryChannelsConfig.map { + case (channelName, channelConfig) => (channelName -> BinaryPropagation.from(channelConfig, classLoading)) + } + + Components(httpChannels, binaryChannels) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/ContextStorage.scala b/kamon-core/src/main/scala/kamon/ContextStorage.scala new file mode 100644 index 00000000..47d0b567 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ContextStorage.scala @@ -0,0 +1,49 @@ +package kamon + +import kamon.context.{Context, Storage} +import kamon.trace.Span + +import scala.util.control.NonFatal + +trait ContextStorage { + private val _contextStorage = Storage.ThreadLocal() + + def currentContext(): Context = + _contextStorage.current() + + def currentSpan(): Span = + _contextStorage.current().get(Span.ContextKey) + + def storeContext(context: Context): Storage.Scope = + _contextStorage.store(context) + + def withContext[T](context: Context)(f: => T): T = { + val scope = _contextStorage.store(context) + try { + f + } finally { + scope.close() + } + } + + def withContextKey[T, K](key: Context.Key[K], value: K)(f: => T): T = + withContext(currentContext().withKey(key, value))(f) + + def withSpan[T](span: Span)(f: => T): T = + withSpan(span, true)(f) + + def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = { + try { + withContextKey(Span.ContextKey, span)(f) + } catch { + case NonFatal(t) => + span.addError(t.getMessage, t) + throw t + + } finally { + if(finishSpan) + span.finish() + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala index 1c00679d..2e07d18d 100644 --- a/kamon-core/src/main/scala/kamon/Environment.scala +++ b/kamon-core/src/main/scala/kamon/Environment.scala @@ -21,6 +21,9 @@ import java.util.concurrent.ThreadLocalRandom import com.typesafe.config.Config import kamon.util.HexCodec + + + case class Environment(host: String, service: String, instance: String, incarnation: String, tags: Map[String, String]) object Environment { @@ -47,6 +50,4 @@ object Environment { private def readValueOrGenerate(configuredValue: String, generator: => String): String = if(configuredValue == "auto") generator else configuredValue - - } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 8f69ab41..a6cc7bf4 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -19,7 +19,6 @@ import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} import com.typesafe.config.{Config, ConfigFactory} -import kamon.context.{Codecs, Context, Key, Storage} import kamon.metric._ import kamon.trace._ import kamon.util.{Clock, Filters, Matcher, Registration} @@ -29,7 +28,7 @@ import scala.concurrent.Future import scala.util.Try -object Kamon extends MetricLookup with ReporterRegistry with Tracer { +object Kamon extends MetricLookup with ClassLoading with Configuration with ReporterRegistry with Tracer with ContextPropagation with ContextStorage { private val logger = LoggerFactory.getLogger("kamon.Kamon") @volatile private var _config = ConfigFactory.load() @@ -41,39 +40,26 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { private val _metrics = new MetricRegistry(_config, _scheduler) private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock) private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock) - private val _contextStorage = Storage.ThreadLocal() - private val _contextCodec = new Codecs(_config) - private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] + //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] sys.addShutdownHook(() => _scheduler.shutdown()) def environment: Environment = _environment - def config(): Config = - _config - - def reconfigure(config: Config): Unit = synchronized { + onReconfigure(newConfig => { _config = config _environment = Environment.fromConfig(config) _filters = Filters.fromConfig(config) _metrics.reconfigure(config) _reporterRegistry.reconfigure(config) _tracer.reconfigure(config) - _contextCodec.reconfigure(config) - - _onReconfigureHooks.foreach(hook => { - Try(hook.onReconfigure(config)).failed.foreach(error => - logger.error("Exception occurred while trying to run a OnReconfigureHook", error) - ) - }) _scheduler match { case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config)) case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other) } - } - + }) override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric = _metrics.histogram(name, unit, dynamicRange) @@ -102,47 +88,6 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def identityProvider: IdentityProvider = _tracer.identityProvider - def contextCodec(): Codecs = - _contextCodec - - def currentContext(): Context = - _contextStorage.current() - - def currentSpan(): Span = - _contextStorage.current().get(Span.ContextKey) - - def storeContext(context: Context): Storage.Scope = - _contextStorage.store(context) - - def withContext[T](context: Context)(f: => T): T = { - val scope = _contextStorage.store(context) - try { - f - } finally { - scope.close() - } - } - - def withContextKey[T, K](key: Key[K], value: K)(f: => T): T = - withContext(currentContext().withKey(key, value))(f) - - def withSpan[T](span: Span)(f: => T): T = - withSpan(span, true)(f) - - def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = { - try { - withContextKey(Span.ContextKey, span)(f) - } catch { - case t: Throwable => - span.addError(t.getMessage, t) - throw t - - } finally { - if(finishSpan) - span.finish() - } - } - override def loadReportersFromConfig(): Unit = _reporterRegistry.loadReportersFromConfig() @@ -173,14 +118,6 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { def clock(): Clock = _clock - /** - * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All - * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config). - */ - def onReconfigure(hook: OnReconfigureHook): Unit = synchronized { - _onReconfigureHooks = hook +: _onReconfigureHooks - } - def scheduler(): ScheduledExecutorService = _scheduler @@ -188,7 +125,3 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { config.getInt("kamon.scheduler-pool-size") } - -trait OnReconfigureHook { - def onReconfigure(newConfig: Config): Unit -} diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala new file mode 100644 index 00000000..75e65c44 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala @@ -0,0 +1,301 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} + +import com.typesafe.config.Config +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag +import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} + + +/** + * Context propagation that uses byte stream abstractions as the transport medium. The Binary propagation uses + * instances of [[ByteStreamReader]] and [[ByteStreamWriter]] to decode and encode Context instances, respectively. + * + * Binary propagation uses the [[ByteStreamReader]] and [[ByteStreamWriter]] abstraction which closely model the APIs + * from [[InputStream]] and [[OutputStream]], but without exposing additional functionality that wouldn't have any + * well defined behavior for Context propagation, e.g. flush or close functions on OutputStreams. + */ +object BinaryPropagation { + + /** + * Represents a readable stream of bytes. This interface closely resembles [[InputStream]], minus the functionality + * that wouldn't have a clearly defined behavior in the context of Context propagation. + */ + trait ByteStreamReader { + /** + * Number of available bytes on the ByteStream. + */ + def available(): Int + + /** + * Reads as many bytes as possible into the target byte array. + * + * @param target Target buffer in which the read bytes will be written. + * @return The number of bytes written into the target buffer. + */ + def read(target: Array[Byte]): Int + + /** + * Reads a specified number of bytes into the target buffer, starting from the offset position. + * + * @param target Target buffer in which read bytes will be written. + * @param offset Offset index in which to start writing bytes on the target buffer. + * @param count Number of bytes to be read. + * @return The number of bytes written into the target buffer. + */ + def read(target: Array[Byte], offset: Int, count: Int): Int + + /** + * Reads all available bytes into a newly created byte array. + * + * @return All bytes read. + */ + def readAll(): Array[Byte] + } + + + object ByteStreamReader { + + /** + * Creates a new [[ByteStreamReader]] that reads data from a byte array. + */ + def of(bytes: Array[Byte]): ByteStreamReader = new ByteArrayInputStream(bytes) with ByteStreamReader { + override def readAll(): Array[Byte] = { + val target = Array.ofDim[Byte](available()) + read(target, 0, available()) + target + } + } + } + + + /** + * Represents a writable stream of bytes. This interface closely resembles [[OutputStream]], minus the functionality + * that wouldn't have a clearly defined behavior in the context of Context propagation. + */ + trait ByteStreamWriter { + + /** + * Writes all bytes into the stream. + */ + def write(bytes: Array[Byte]): Unit + + /** + * Writes a portion of the provided bytes into the stream. + * + * @param bytes Buffer from which data will be selected. + * @param offset Starting index on the buffer. + * @param count Number of bytes to write into the stream. + */ + def write(bytes: Array[Byte], offset: Int, count: Int): Unit + + /** + * Write a single byte into the stream. + */ + def write(byte: Int): Unit + + } + + object ByteStreamWriter { + + /** + * Creates a new [[ByteStreamWriter]] from an OutputStream. + */ + def of(outputStream: OutputStream): ByteStreamWriter = new ByteStreamWriter { + override def write(bytes: Array[Byte]): Unit = + outputStream.write(bytes) + + override def write(bytes: Array[Byte], offset: Int, count: Int): Unit = + outputStream.write(bytes, offset, count) + + override def write(byte: Int): Unit = + outputStream.write(byte) + } + } + + + + /** + * Create a new default Binary Propagation instance from the provided configuration. + * + * @param config Binary Propagation channel configuration + * @return A newly constructed HttpPropagation instance. + */ + def from(config: Config, classLoading: ClassLoading): Propagation[ByteStreamReader, ByteStreamWriter] = { + new BinaryPropagation.Default(Settings.from(config, classLoading)) + } + + /** + * Default Binary propagation in Kamon. This implementation uses Colfer to read and write the context tags and + * entries. Entries are represented as simple pairs of entry name and bytes, which are then processed by the all + * configured entry readers and writers. + */ + class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] { + private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default]) + private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] { + override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128) + } + + private val _contextBufferPool = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize) + } + + override def read(reader: ByteStreamReader): Context = { + if(reader.available() > 0) { + val contextData = Try { + val cContext = new ColferContext() + cContext.unmarshal(reader.readAll(), 0) + cContext + } + + contextData.failed.foreach { + case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t) + } + + contextData.map { colferContext => + + // Context tags + var tagSectionsCount = colferContext.tags.length + if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { + _log.warn("Malformed Context tags found, tags consistency might be compromised") + tagSectionsCount -= 1 + } + + val tags = if (tagSectionsCount > 0) { + val tagsBuilder = Map.newBuilder[String, String] + var tagIndex = 0 + while (tagIndex < tagSectionsCount) { + tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1)) + tagIndex += 2 + } + tagsBuilder.result() + + } else Map.empty[String, String] + + + // Only reads the entries for which there is a registered reader + colferContext.entries.foldLeft(Context.of(tags)) { + case (context, entryData) => + settings.incomingEntries.get(entryData.name).map { entryReader => + var contextWithEntry = context + try { + contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) + } catch { + case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) + } + + contextWithEntry + }.getOrElse(context) + } + } getOrElse(Context.Empty) + } else Context.Empty + } + + override def write(context: Context, writer: ByteStreamWriter): Unit = { + if (context.nonEmpty()) { + val contextData = new ColferContext() + val output = _streamPool.get() + val contextOutgoingBuffer = _contextBufferPool.get() + + if (context.tags.nonEmpty) { + val tags = Array.ofDim[String](context.tags.size * 2) + var tagIndex = 0 + context.tags.foreach { + case (key, value) => + tags.update(tagIndex, key) + tags.update(tagIndex + 1, value) + tagIndex += 2 + } + + contextData.tags = tags + } + + if (context.entries.nonEmpty) { + val entries = settings.outgoingEntries.collect { + case (entryName, entryWriter) if context.entries.contains(entryName) => + val colferEntry = new ColferEntry() + try { + output.reset() + entryWriter.write(context, output) + + colferEntry.name = entryName + colferEntry.content = output.toByteArray() + } catch { + case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t) + } + + colferEntry + } + + contextData.entries = entries.toArray + } + + try { + val contextSize = contextData.marshal(contextOutgoingBuffer, 0) + writer.write(contextOutgoingBuffer, 0, contextSize) + } catch { + case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t) + } + } + } + } + + object Default { + private class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter { + def underlying(): Array[Byte] = this.buf + } + } + + case class Settings( + maxOutgoingSize: Int, + incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]], + outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]] + ) + + object Settings { + private val log = LoggerFactory.getLogger(classOf[BinaryPropagation.Settings]) + + def from(config: Config, classLoading: ClassLoading): BinaryPropagation.Settings = { + def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { + val instanceMap = Map.newBuilder[String, ExpectedType] + + mappings.foreach { + case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match { + case Success(componentInstance) => instanceMap += (contextKey -> componentInstance) + case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception) + } + } + + instanceMap.result() + } + + Settings( + config.getBytes("max-outgoing-size").toInt, + buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs), + buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs) + ) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala deleted file mode 100644 index c5d237a9..00000000 --- a/kamon-core/src/main/scala/kamon/context/Codecs.scala +++ /dev/null @@ -1,295 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon -package context - -import java.nio.ByteBuffer - -import com.typesafe.config.Config -import kamon.util.DynamicAccess -import org.slf4j.LoggerFactory -import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} - -import scala.collection.mutable - -class Codecs(initialConfig: Config) { - private val log = LoggerFactory.getLogger(classOf[Codecs]) - @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty) - @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty) - - reconfigure(initialConfig) - - - def HttpHeaders: Codecs.ForContext[TextMap] = - httpHeaders - - def Binary: Codecs.ForContext[ByteBuffer] = - binary - - def reconfigure(config: Config): Unit = { - import scala.collection.JavaConverters._ - try { - val codecsConfig = config.getConfig("kamon.context.codecs") - val stringKeys = readStringKeysConfig(codecsConfig.getConfig("string-keys")) - val knownHttpHeaderCodecs = readEntryCodecs[TextMap]("http-headers-keys", codecsConfig) ++ stringHeaderCodecs(stringKeys) - val knownBinaryCodecs = readEntryCodecs[ByteBuffer]("binary-keys", codecsConfig) ++ stringBinaryCodecs(stringKeys) - - httpHeaders = new Codecs.HttpHeaders(knownHttpHeaderCodecs) - binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), knownBinaryCodecs) - } catch { - case t: Throwable => log.error("Failed to initialize Context Codecs", t) - } - } - - private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = { - val rootConfig = config.getConfig(rootKey) - val dynamic = new DynamicAccess(getClass.getClassLoader) - val entries = Map.newBuilder[String, Codecs.ForEntry[T]] - - rootConfig.topLevelKeys.foreach(key => { - try { - val fqcn = rootConfig.getString(key) - entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get)) - } catch { - case e: Throwable => - log.error(s"Failed to initialize codec for key [$key]", e) - } - }) - - entries.result() - } - - private def readStringKeysConfig(config: Config): Map[String, String] = - config.topLevelKeys.map(key => (key, config.getString(key))).toMap - - private def stringHeaderCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[TextMap]] = - keys.map { case (key, header) => (key, new Codecs.StringHeadersCodec(key, header)) } - - private def stringBinaryCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[ByteBuffer]] = - keys.map { case (key, _) => (key, new Codecs.StringBinaryCodec(key)) } -} - -object Codecs { - - trait ForContext[T] { - def encode(context: Context): T - def decode(carrier: T): Context - } - - trait ForEntry[T] { - def encode(context: Context): T - def decode(carrier: T, context: Context): Context - } - - final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] { - private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) - - override def encode(context: Context): TextMap = { - val encoded = TextMap.Default() - - context.entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(codec) => - try { - codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) - } catch { - case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) - } - - case None => - log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } - - encoded - } - - override def decode(carrier: TextMap): Context = { - var context: Context = Context.Empty - - try { - context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { - val (_, codec) = codecEntry - codec.decode(carrier, ctx) - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from HttpHeaders", e) - } - - context - } - } - - - final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] { - private val log = LoggerFactory.getLogger(classOf[Binary]) - private val binaryBuffer = newThreadLocalBuffer(bufferSize) - private val emptyBuffer = ByteBuffer.allocate(0) - - override def encode(context: Context): ByteBuffer = { - val entries = context.entries - if(entries.isEmpty) - emptyBuffer - else { - var colferEntries: List[ColferEntry] = Nil - entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(entryCodec) => - try { - val entryData = entryCodec.encode(context) - if(entryData.capacity() > 0) { - val colferEntry = new ColferEntry() - colferEntry.setName(key.name) - colferEntry.setContent(entryData.array()) - colferEntries = colferEntry :: colferEntries - } - } catch { - case throwable: Throwable => - log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) - } - - case None => - log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } - - if(colferEntries.isEmpty) - emptyBuffer - else { - val buffer = binaryBuffer.get() - val colferContext = new ColferContext() - colferContext.setEntries(colferEntries.toArray) - val marshalledSize = colferContext.marshal(buffer, 0) - - val data = Array.ofDim[Byte](marshalledSize) - System.arraycopy(buffer, 0, data, 0, marshalledSize) - ByteBuffer.wrap(data) - } - } - } - - - override def decode(carrier: ByteBuffer): Context = { - if(carrier.capacity() == 0) - Context.Empty - else { - var context: Context = Context.Empty - - try { - val colferContext = new ColferContext() - colferContext.unmarshal(carrier.array(), 0) - - colferContext.entries.foreach(colferEntry => { - entryCodecs.get(colferEntry.getName()) match { - case Some(entryCodec) => - context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context) - - case None => - log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName()) - } - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from Binary", e) - } - - context - } - } - - - private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] { - override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt) - } - } - - private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] { - private val contextKey = Key.broadcast[Option[String]](key, None) - - override def encode(context: Context): TextMap = { - val textMap = TextMap.Default() - context.get(contextKey).foreach { value => - textMap.put(headerName, value) - } - - textMap - } - - override def decode(carrier: TextMap, context: Context): Context = { - carrier.get(headerName) match { - case value @ Some(_) => context.withKey(contextKey, value) - case None => context - } - } - } - - private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] { - val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0) - private val contextKey = Key.broadcast[Option[String]](key, None) - - override def encode(context: Context): ByteBuffer = { - context.get(contextKey) match { - case Some(value) => ByteBuffer.wrap(value.getBytes) - case None => emptyBuffer - } - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - context.withKey(contextKey, Some(new String(carrier.array()))) - } - } -} - - -trait TextMap { - - def get(key: String): Option[String] - - def put(key: String, value: String): Unit - - def values: Iterator[(String, String)] -} - -object TextMap { - - class Default extends TextMap { - private val storage = - mutable.Map.empty[String, String] - - override def get(key: String): Option[String] = - storage.get(key) - - override def put(key: String, value: String): Unit = - storage.put(key, value) - - override def values: Iterator[(String, String)] = - storage.toIterator - } - - object Default { - def apply(): Default = - new Default() - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index e0b084cb..2a7a382e 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -1,5 +1,5 @@ /* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * Copyright © 2013-2018 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -13,90 +13,88 @@ * ========================================================================================= */ -package kamon.context +package kamon +package context -import java.io._ -import java.nio.ByteBuffer +import java.util.{Map => JavaMap} +import scala.collection.JavaConverters._ -import kamon.Kamon +class Context private (val entries: Map[String, Any], val tags: Map[String, String]) { -class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable { - def get[T](key: Key[T]): T = - entries.getOrElse(key, key.emptyValue).asInstanceOf[T] + def get[T](key: Context.Key[T]): T = + entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T] - def withKey[T](key: Key[T], value: T): Context = - new Context(entries.updated(key, value)) + def getTag(tagKey: String): Option[String] = + tags.get(tagKey) - var _deserializedEntries: Map[Key[_], Any] = Map.empty + def withKey[T](key: Context.Key[T], value: T): Context = + new Context(entries.updated(key.name, value), tags) - @throws[IOException] - private def writeObject(out: ObjectOutputStream): Unit = out.write( - Kamon.contextCodec().Binary.encode(this).array() - ) + def withTag(tagKey: String, tagValue: String): Context = + new Context(entries, tags.updated(tagKey, tagValue)) - @throws[IOException] - @throws[ClassNotFoundException] - private def readObject(in: ObjectInputStream): Unit = { - val buf = new Array[Byte](in.available()) - in.readFully(buf) - _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries - } + def withTags(tags: Map[String, String]): Context = + new Context(entries, this.tags ++ tags) - def readResolve(): AnyRef = new Context(_deserializedEntries) + def withTags(tags: JavaMap[String, String]): Context = + new Context(entries, this.tags ++ tags.asScala.toMap) - override def equals(obj: scala.Any): Boolean = { - obj != null && - obj.isInstanceOf[Context] && - obj.asInstanceOf[Context].entries != null && - obj.asInstanceOf[Context].entries == this.entries - } + def isEmpty(): Boolean = + entries.isEmpty && tags.isEmpty - override def hashCode(): Int = entries.hashCode() + def nonEmpty(): Boolean = + !isEmpty() } object Context { - val Empty = new Context(Map.empty) + val Empty = new Context(Map.empty, Map.empty) - def apply(): Context = - Empty + def of(tags: JavaMap[String, String]): Context = + new Context(Map.empty, tags.asScala.toMap) - def create(): Context = - Empty + def of(tags: Map[String, String]): Context = + new Context(Map.empty, tags) - def apply[T](key: Key[T], value: T): Context = - new Context(Map(key -> value)) + def of[T](key: Context.Key[T], value: T): Context = + new Context(Map(key.name -> value), Map.empty) - def create[T](key: Key[T], value: T): Context = - apply(key, value) - -} - - -sealed abstract class Key[T] { - def name: String - def emptyValue: T - def broadcast: Boolean -} + def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context = + new Context(Map(key.name -> value), tags.asScala.toMap) -object Key { + def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context = + new Context(Map(key.name -> value), tags) - def local[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, false) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context = + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty) - def broadcast[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context = + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap) - def broadcastString(name: String): Key[Option[String]] = - new Default[Option[String]](name, None, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context = + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags) + def key[T](name: String, emptyValue: T): Context.Key[T] = + new Context.Key(name, emptyValue) - private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] { + /** + * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance + * must be done using a context key, which will ensure the right type is used on both operations. The key's name + * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels. + * + * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned + * instead. + * + * @param name Key name. Must be unique. + * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key. + * @tparam ValueType Type of the value to be held on the context with this key. + */ + final class Key[ValueType](val name: String, val emptyValue: ValueType) { override def hashCode(): Int = name.hashCode override def equals(that: Any): Boolean = - that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name + that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala new file mode 100644 index 00000000..6a15e2a6 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -0,0 +1,206 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import com.typesafe.config.Config +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag +import scala.util.control.NonFatal +import scala.util.{Failure, Success} + +/** + * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of + * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on + * outgoing requests to transfer a context to remote services. + */ +object HttpPropagation { + + /** + * Wrapper that reads HTTP headers from a HTTP message. + */ + trait HeaderReader { + + /** + * Reads a single HTTP header value. + * + * @param header HTTP header name + * @return The HTTP header value, if present. + */ + def read(header: String): Option[String] + + /** + * Reads all HTTP headers present in the wrapped HTTP message. + * + * @return A map from header name to + */ + def readAll(): Map[String, String] + } + + /** + * Wrapper that writes HTTP headers to a HTTP message. + */ + trait HeaderWriter { + + /** + * Writes a HTTP header into a HTTP message. + * + * @param header HTTP header name. + * @param value HTTP header value. + */ + def write(header: String, value: String): Unit + } + + + + /** + * Create a new default HTTP propagation instance from the provided configuration. + * + * @param config HTTP propagation channel configuration + * @return A newly constructed HttpPropagation instance. + */ + def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = { + new HttpPropagation.Default(Settings.from(config, classLoading)) + } + + /** + * Default HTTP Propagation in Kamon. + */ + class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default]) + + /** + * Reads context tags and entries on the following order: + * 1. Read all context tags from the context tags header. + * 2. Read all context tags with explicit mappings. This overrides any tag + * from the previous step in case of a tag key clash. + * 3. Read all context entries using the incoming entries configuration. + */ + override def read(reader: HeaderReader): Context = { + val tags = Map.newBuilder[String, String] + + // Tags encoded together in the context tags header. + try { + reader.read(settings.tagsHeaderName).foreach { contextTagsHeader => + contextTagsHeader.split(";").foreach(tagData => { + val tagPair = tagData.split("=") + if (tagPair.length == 2) { + tags += (tagPair(0) -> tagPair(1)) + } + }) + } + } catch { + case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any]) + } + + // Tags explicitly mapped on the tags.mappings configuration. + settings.tagsMappings.foreach { + case (tagName, httpHeader) => + try { + reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + } catch { + case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) + } + } + + // Incoming Entries + settings.incomingEntries.foldLeft(Context.of(tags.result())) { + case (context, (entryName, entryDecoder)) => + var result = context + try { + result = entryDecoder.read(reader, context) + } catch { + case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + + result + } + } + + /** + * Writes context tags and entries + */ + override def write(context: Context, writer: HeaderWriter): Unit = { + val contextTagsHeader = new StringBuilder() + def appendTag(key: String, value: String): Unit = { + contextTagsHeader + .append(key) + .append('=') + .append(value) + .append(';') + } + + // Write tags with specific mappings or append them to the context tags header. + context.tags.foreach { + case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match { + case Some(mappedHeader) => writer.write(mappedHeader, tagValue) + case None => appendTag(tagKey, tagValue) + } + } + + // Write the context tags header. + if(contextTagsHeader.nonEmpty) { + writer.write(settings.tagsHeaderName, contextTagsHeader.result()) + } + + // Write entries for the specified direction. + settings.outgoingEntries.foreach { + case (entryName, entryWriter) => + try { + entryWriter.write(context, writer) + } catch { + case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + } + } + } + + case class Settings( + tagsHeaderName: String, + tagsMappings: Map[String, String], + incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]], + outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]] + ) + + object Settings { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings]) + + def from(config: Config, classLoading: ClassLoading): Settings = { + def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { + val instanceMap = Map.newBuilder[String, ExpectedType] + + mappings.foreach { + case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match { + case Success(componentInstance) => instanceMap += (contextKey -> componentInstance) + case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception) + } + } + + instanceMap.result() + } + + val tagsHeaderName = config.getString("tags.header-name") + val tagsMappings = config.getConfig("tags.mappings").pairs + val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs) + val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs) + + Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala deleted file mode 100644 index 3445cc31..00000000 --- a/kamon-core/src/main/scala/kamon/context/Mixin.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import kamon.Kamon - - -/** - * Utility trait that marks objects carrying a reference to a Context instance. - * - */ -trait HasContext { - def context: Context -} - -object HasContext { - private case class Default(context: Context) extends HasContext - - /** - * Construct a HasSpan instance that references the provided Context. - * - */ - def from(context: Context): HasContext = - Default(context) - - /** - * Construct a HasContext instance with the current Kamon from Kamon's default context storage. - * - */ - def fromCurrentContext(): HasContext = - Default(Kamon.currentContext()) -} diff --git a/kamon-core/src/main/scala/kamon/context/Propagation.scala b/kamon-core/src/main/scala/kamon/context/Propagation.scala new file mode 100644 index 00000000..1d973ca9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Propagation.scala @@ -0,0 +1,81 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.context + +/** + * Inter-process Context propagation interface. Implementations are responsible from reading and writing a lossless + * representation of a given Context instance to the appropriate mediums. Out of the box, Kamon ships with two + * implementations: + * + * * HttpPropagation: Uses HTTP headers as the medium to transport the Context data. + * * BinaryPropagation: Uses byte streams as the medium to transport the Context data. + * + */ +trait Propagation[ReaderMedium, WriterMedium] { + + /** + * Attempts to read a Context from the [[ReaderMedium]]. + * + * @param medium An abstraction the reads data from the medium transporting the Context data. + * @return The decoded Context instance or an empty Context if no entries or tags could be read from the medium. + */ + def read(medium: ReaderMedium): Context + + /** + * Attempts to write a Context instance to the [[WriterMedium]]. + * + * @param context Context instance to be written. + * @param medium An abstraction that writes data into the medium that will transport the Context data. + */ + def write(context: Context, medium: WriterMedium): Unit + +} + +object Propagation { + + /** + * Encapsulates logic required to read a single context entry from a medium. Implementations of this trait + * must be aware of the entry they are able to read. + */ + trait EntryReader[Medium] { + + /** + * Tries to read a context entry from the medium. If a context entry is successfully read, implementations + * must return an updated context instance that includes such entry. If no entry could be read simply return the + * context instance that was passed in, unchanged. + * + * @param medium An abstraction the reads data from the medium transporting the Context data. + * @param context Current context. + * @return Either the original context passed in or a modified version of it, including the read entry. + */ + def read(medium: Medium, context: Context): Context + } + + /** + * Encapsulates logic required to write a single context entry to the medium. Implementations of this trait + * must be aware of the entry they are able to write. + */ + trait EntryWriter[Medium] { + + /** + * Tries to write a context entry into the medium. + * + * @param context The context from which entries should be written. + * @param medium An abstraction that writes data into the medium that will transport the Context data. + */ + def write(context: Context, medium: Medium): Unit + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala new file mode 100644 index 00000000..b141331b --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala @@ -0,0 +1,66 @@ +package kamon.instrumentation + +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} + +/** + * Base abstractions over HTTP messages. + */ +object HttpMessage { + + /** + * Wrapper for HTTP Request messages. + */ + trait Request extends HeaderReader { + + /** + * Request URL. + */ + def url: String + + /** + * Full request path. Does not include the query. + */ + def path: String + + /** + * HTTP Method. + */ + def method: String + } + + /** + * Wrapper for HTTP response messages. + */ + trait Response { + + /** + * Status code on the response message. + */ + def statusCode: Int + } + + /** + * A HTTP message builder on which header values can be written and a complete HTTP message can be build from. + * Implementations will typically wrap a HTTP message model from an instrumented framework and either accumulate + * all header writes until a call to to .build() is made and a new HTTP message is constructed merging the previous + * and accumulated headers (on immutable HTTP models) or directly write the headers on the underlying HTTP message + * (on mutable HTTP models). + */ + trait Builder[Message] extends HeaderWriter { + + /** + * Returns a version a version of the HTTP message container all headers that have been written to the builder. + */ + def build(): Message + } + + /** + * Builder for HTTP Request messages. + */ + trait RequestBuilder[Message] extends Request with Builder[Message] + + /** + * Builder for HTTP Response messages. + */ + trait ResponseBuilder[Message] extends Response with Builder[Message] +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala new file mode 100644 index 00000000..72828424 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -0,0 +1,453 @@ +package kamon +package instrumentation + +import java.time.Duration + +import com.typesafe.config.Config +import kamon.context.Context +import kamon.instrumentation.HttpServer.Settings.TagMode +import kamon.metric.MeasurementUnit.{time, information} +import kamon.trace.{IdentityProvider, Span} +import kamon.util.GlobPathFilter +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + +/** + * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server + * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All + * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default". + * + * The default implementation shipping with Kamon provides: + * + * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context + * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server. + * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric + * tags from the incoming requests as well as form the incoming context tags. + * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code + * counts in the HTTP server. + * + */ +trait HttpServer { + + /** + * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that + * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from + * HTTP headers (if context propagation is enabled). + * + * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked + * for all incoming requests. + * + * @param request A HttpRequest wrapper on the original incoming HTTP request. + * @return The RequestHandler that will follow the lifecycle of the incoming request. + */ + def receive(request: HttpMessage.Request): HttpServer.RequestHandler + + + /** + * Signals that a new HTTP connection has been opened. + */ + def openConnection(): Unit + + + /** + * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests + * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will + * be updated when the values are zero. + * + * @param lifetime For how long did the connection remain open. + * @param handledRequests How many requests where handled by the closed connection. + */ + def closeConnection(lifetime: Duration, handledRequests: Long): Unit + + /** + * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances + * after calling this function is undefined. + */ + def shutdown(): Unit + +} + +object HttpServer { + + /** + * Handler associated to the processing of a single request. The instrumentation code using this class is responsible + * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the + * doneReceiving, send and doneSending callbacks when appropriate. + */ + trait RequestHandler { + + /** + * If context propagation is enabled this function returns the incoming context associated wih this request, + * otherwise [[Context.Empty]] is returned. + */ + def context: Context + + /** + * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span. + */ + def span: Span + + /** + * Signals that the entire request (headers and body) has been received. + * + * @param receivedBytes Size of the entire HTTP request. + */ + def doneReceiving(receivedBytes: Long): Unit + + /** + * Process a response to be sent back to the client. Since returning keys might need to included in the response + * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one + * passed into this function. + * + * @param response Wraps the HTTP response to be sent back to the client. + * @param context Context that should be used for writing returning keys into the response. + * @return The modified HTTP response that should be sent to clients. + */ + def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse + + /** + * Signals that the entire response (headers and body) has been sent to the client. + * + * @param sentBytes Size of the entire HTTP response. + */ + def doneSending(sentBytes: Long): Unit + + } + + + /** + * Holds all metric instruments required to record metrics from an HTTP server. + * + * @param interface Interface name or address where the HTTP server is listening. + * @param port Port number where the HTTP server is listening. + */ + class Metrics(component: String, interface: String, port: Int) { + import Metrics._ + private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics]) + private val _statusCodeTag = "status_code" + private val _serverTags = Map( + "component" -> component, + "interface" -> interface, + "port" -> port.toString + ) + + val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx")) + val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx")) + val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx")) + val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx")) + val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx")) + + val activeRequests = ActiveRequests.refine(_serverTags) + val requestSize = RequestSize.refine(_serverTags) + val responseSize = ResponseSize.refine(_serverTags) + val connectionLifetime = ConnectionLifetime.refine(_serverTags) + val connectionUsage = ConnectionUsage.refine(_serverTags) + val openConnections = OpenConnections.refine(_serverTags) + + + def countCompletedRequest(statusCode: Int): Unit = { + if(statusCode >= 200 && statusCode <= 299) + requestsSuccessful.increment() + else if(statusCode >= 500 && statusCode <=599) + requestsServerError.increment() + else if(statusCode >= 400 && statusCode <=499) + requestsClientError.increment() + else if(statusCode >= 300 && statusCode <=399) + requestsRedirection.increment() + else if(statusCode >= 100 && statusCode <=199) + requestsInformational.increment() + else { + _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString) + } + } + + /** + * Removes all registered metrics from Kamon. + */ + def cleanup(): Unit = { + CompletedRequests.remove(statusCodeTag("1xx")) + CompletedRequests.remove(statusCodeTag("2xx")) + CompletedRequests.remove(statusCodeTag("3xx")) + CompletedRequests.remove(statusCodeTag("4xx")) + CompletedRequests.remove(statusCodeTag("5xx")) + + ActiveRequests.remove(_serverTags) + RequestSize.remove(_serverTags) + ResponseSize.remove(_serverTags) + ConnectionLifetime.remove(_serverTags) + ConnectionUsage.remove(_serverTags) + OpenConnections.remove(_serverTags) + } + + private def statusCodeTag(group: String): Map[String, String] = + _serverTags + (_statusCodeTag -> group) + + } + + + object Metrics { + + def of(component: String, interface: String, port: Int): Metrics = + new HttpServer.Metrics(component, interface, port) + + /** + * Number of completed requests per status code. + */ + val CompletedRequests = Kamon.counter("http.server.requests") + + /** + * Number of requests being processed simultaneously at any point in time. + */ + val ActiveRequests = Kamon.rangeSampler("http.server.request.active") + + /** + * Request size distribution (including headers and body) for all requests received by the server. + */ + val RequestSize = Kamon.histogram("http.server.request.size", information.bytes) + + /** + * Response size distribution (including headers and body) for all responses served by the server. + */ + val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes) + + /** + * Tracks the time elapsed between connection creation and connection close. + */ + val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds) + + /** + * Distribution of number of requests handled per connection during their entire lifetime. + */ + val ConnectionUsage = Kamon.histogram("http.server.connection.usage") + + /** + * Number of open connections. + */ + val OpenConnections = Kamon.rangeSampler("http.server.connection.open") + } + + + def from(name: String, component: String, interface: String, port: Int): HttpServer = { + from(name, component, interface, port, Kamon, Kamon) + } + + def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { + val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration) + val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else { + configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration) + } + + new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port) + } + + val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server" + val DefaultHttpServer = "default" + val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default" + + + private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer { + private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None + private val _log = LoggerFactory.getLogger(classOf[Default]) + private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) + .getOrElse { + _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation") + contextPropagation.defaultHttpPropagation() + } + + override def receive(request: HttpMessage.Request): RequestHandler = { + + val incomingContext = if(settings.enableContextPropagation) + _propagation.read(request) + else Context.Empty + + val requestSpan = if(settings.enableTracing) + buildServerSpan(incomingContext, request) + else Span.Empty + + val handlerContext = if(requestSpan.nonEmpty()) + incomingContext.withKey(Span.ContextKey, requestSpan) + else incomingContext + + _metrics.foreach { httpServerMetrics => + httpServerMetrics.activeRequests.increment() + } + + + new HttpServer.RequestHandler { + override def context: Context = + handlerContext + + override def span: Span = + requestSpan + + override def doneReceiving(receivedBytes: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.requestSize.record(receivedBytes) + } + } + + override def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse = { + def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match { + case TagMode.Metric => span.tagMetric(tag, value) + case TagMode.Span => span.tag(tag, value) + case TagMode.Off => + } + + if(settings.enableContextPropagation) { + _propagation.write(context, response) + } + + _metrics.foreach { httpServerMetrics => + httpServerMetrics.countCompletedRequest(response.statusCode) + } + + addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode) + response.build() + } + + override def doneSending(sentBytes: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.activeRequests.decrement() + httpServerMetrics.responseSize.record(sentBytes) + } + + span.finish() + } + } + } + + override def openConnection(): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.openConnections.increment() + } + } + + override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.openConnections.decrement() + httpServerMetrics.connectionLifetime.record(lifetime.toNanos) + httpServerMetrics.connectionUsage.record(handledRequests) + } + } + + override def shutdown(): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.cleanup() + } + } + + + private def buildServerSpan(context: Context, request: HttpMessage.Request): Span = { + val span = Kamon.buildSpan(operationName(request)) + .withMetricTag("span.kind", "server") + .withMetricTag("component", component) + + if(!settings.enableSpanMetrics) + span.disableMetrics() + + + for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } { + val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID) + if(identifier != IdentityProvider.NoIdentifier) + span.withTraceID(identifier) + } + + def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match { + case TagMode.Metric => span.withMetricTag(tag, value) + case TagMode.Span => span.withTag(tag, value) + case TagMode.Off => + } + + addRequestTag("http.url", request.url, settings.urlTagMode) + addRequestTag("http.method", request.method, settings.urlTagMode) + settings.contextTags.foreach { + case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode)) + } + + span.start() + } + + private def operationName(request: HttpMessage.Request): String = { + val requestPath = request.path + val customMapping = settings.operationMappings.collectFirst { + case (pattern, operationName) if pattern.accept(requestPath) => operationName + } + + customMapping.getOrElse("http.request") + } + } + + + case class Settings( + enableContextPropagation: Boolean, + propagationChannel: String, + enableServerMetrics: Boolean, + enableTracing: Boolean, + traceIDTag: Option[String], + enableSpanMetrics: Boolean, + urlTagMode: TagMode, + methodTagMode: TagMode, + statusCodeTagMode: TagMode, + contextTags: Map[String, TagMode], + unhandledOperationName: String, + operationMappings: Map[GlobPathFilter, String] + ) + + object Settings { + + sealed trait TagMode + object TagMode { + case object Metric extends TagMode + case object Span extends TagMode + case object Off extends TagMode + + def from(value: String): TagMode = value.toLowerCase match { + case "metric" => TagMode.Metric + case "span" => TagMode.Span + case _ => TagMode.Off + } + } + + def from(config: Config): Settings = { + + // Context propagation settings + val enablePropagation = config.getBoolean("propagation.enabled") + val propagationChannel = config.getString("propagation.channel") + + // HTTP Server metrics settings + val enableServerMetrics = config.getBoolean("metrics.enabled") + + // Tracing settings + val enableTracing = config.getBoolean("tracing.enabled") + val traceIdTag = Option(config.getString("tracing.preferred-trace-id-tag")).filterNot(_ == "none") + val enableSpanMetrics = config.getBoolean("tracing.span-metrics") + val urlTagMode = TagMode.from(config.getString("tracing.tags.url")) + val methodTagMode = TagMode.from(config.getString("tracing.tags.method")) + val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code")) + val contextTags = config.getConfig("tracing.tags.from-context").pairs.map { + case (tagName, mode) => (tagName, TagMode.from(mode)) + } + + val unhandledOperationName = config.getString("tracing.operations.unhandled") + val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map { + case (pattern, operationName) => (new GlobPathFilter(pattern), operationName) + } + + Settings( + enablePropagation, + propagationChannel, + enableServerMetrics, + enableTracing, + traceIdTag, + enableSpanMetrics, + urlTagMode, + methodTagMode, + statusCodeTagMode, + contextTags, + unhandledOperationName, + operationMappings + ) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala b/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala new file mode 100644 index 00000000..f5a5e63b --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala @@ -0,0 +1,36 @@ +package kamon.instrumentation + +import kamon.Kamon +import kamon.context.Context + +/** + * Common mixins used across multiple instrumentation modules. + */ +object Mixin { + + /** + * Utility trait that marks objects carrying a reference to a Context instance. + * + */ + trait HasContext { + def context: Context + } + + object HasContext { + private case class Default(context: Context) extends HasContext + + /** + * Construct a HasSpan instance that references the provided Context. + * + */ + def from(context: Context): HasContext = + Default(context) + + /** + * Construct a HasContext instance with the current Kamon from Kamon's default context storage. + * + */ + def fromCurrentContext(): HasContext = + Default(Kamon.currentContext()) + } +} diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala index d3b25500..d694206c 100644 --- a/kamon-core/src/main/scala/kamon/package.scala +++ b/kamon-core/src/main/scala/kamon/package.scala @@ -92,8 +92,14 @@ package object kamon { def configurations: Map[String, Config] = { topLevelKeys - .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry)))) - .toMap + .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry)))) + .toMap + } + + def pairs: Map[String, String] = { + topLevelKeys + .map(key => (key, config.getString(key))) + .toMap } } } diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 43391af7..6015e350 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -19,7 +19,7 @@ package trace import java.time.Instant import kamon.ReporterRegistry.SpanSink -import kamon.context.Key +import kamon.context.Context import kamon.metric.MeasurementUnit import kamon.trace.SpanContext.SamplingDecision import kamon.util.Clock @@ -66,7 +66,7 @@ sealed abstract class Span { object Span { - val ContextKey = Key.broadcast[Span]("span", Span.Empty) + val ContextKey = Context.key[Span]("span", Span.Empty) object Empty extends Span { override val context: SpanContext = SpanContext.EmptySpanContext diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala index 2a8e2271..ed329ef7 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala @@ -15,7 +15,7 @@ package kamon.trace -import kamon.context.Key +import kamon.context.{Context} import kamon.trace.Tracer.SpanBuilder /** @@ -39,7 +39,7 @@ object SpanCustomizer { override def customize(spanBuilder: SpanBuilder): SpanBuilder = spanBuilder } - val ContextKey = Key.local[SpanCustomizer]("span-customizer", Noop) + val ContextKey = Context.key[SpanCustomizer]("span-customizer", Noop) def forOperationName(operationName: String): SpanCustomizer = new SpanCustomizer { override def customize(spanBuilder: SpanBuilder): SpanBuilder = diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index 14b28d54..dc168347 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -19,54 +19,43 @@ import java.net.{URLDecoder, URLEncoder} import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codecs, Context, TextMap} +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context._ import kamon.context.generated.binary.span.{Span => ColferSpan} +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} import kamon.trace.SpanContext.SamplingDecision -object SpanCodec { +/** + * Propagation mechanisms for Kamon's Span data to and from HTTP and Binary mediums. + */ +object SpanPropagation { - class B3 extends Codecs.ForEntry[TextMap] { + /** + * Reads and Writes a Span instance using the B3 propagation format. The specification and semantics of the B3 + * Propagation protocol can be found here: https://github.com/openzipkin/b3-propagation + */ + class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] { import B3.Headers - override def encode(context: Context): TextMap = { - val span = context.get(Span.ContextKey) - val carrier = TextMap.Default() - - if(span.nonEmpty()) { - val spanContext = span.context() - carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) - - if(spanContext.parentID != IdentityProvider.NoIdentifier) - carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) - - encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - carrier.put(Headers.Sampled, samplingDecision) - } - } - - carrier - } - - override def decode(carrier: TextMap, context: Context): Context = { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = carrier.get(Headers.TraceIdentifier) + val traceID = reader.read(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = carrier.get(Headers.SpanIdentifier) + val spanID = reader.read(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = carrier.get(Headers.ParentSpanIdentifier) + val parentID = reader.read(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = carrier.get(Headers.Flags) + val flags = reader.read(Headers.Flags) - val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -77,6 +66,24 @@ object SpanCodec { } else context } + + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val spanContext = span.context() + writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + + if(spanContext.parentID != IdentityProvider.NoIdentifier) + writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + + encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => + writer.write(Headers.Sampled, samplingDecision) + } + } + } + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { case SamplingDecision.Sample => Some("1") case SamplingDecision.DoNotSample => Some("0") @@ -102,38 +109,27 @@ object SpanCodec { } - class Colfer extends Codecs.ForEntry[ByteBuffer] { - val emptyBuffer = ByteBuffer.allocate(0) - - override def encode(context: Context): ByteBuffer = { - val span = context.get(Span.ContextKey) - if(span.nonEmpty()) { - val marshalBuffer = Colfer.codecBuffer.get() - val colferSpan = new ColferSpan() - val spanContext = span.context() - - colferSpan.setTraceID(spanContext.traceID.bytes) - colferSpan.setSpanID(spanContext.spanID.bytes) - colferSpan.setParentID(spanContext.parentID.bytes) - colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) - - val marshalledSize = colferSpan.marshal(marshalBuffer, 0) - val buffer = ByteBuffer.allocate(marshalledSize) - buffer.put(marshalBuffer, 0, marshalledSize) - buffer - - } else emptyBuffer - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - carrier.clear() - - if(carrier.capacity() == 0) + /** + * Defines a bare bones binary context propagation that uses Colfer [1] as the serialization library. The Schema + * for the Span data is simply defined as: + * + * type Span struct { + * traceID binary + * spanID binary + * parentID binary + * samplingDecision uint8 + * } + * + */ + class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { + + override def read(medium: ByteStreamReader, context: Context): Context = { + if(medium.available() == 0) context else { val identityProvider = Kamon.tracer.identityProvider val colferSpan = new ColferSpan() - colferSpan.unmarshal(carrier.array(), 0) + colferSpan.unmarshal(medium.readAll(), 0) val spanContext = SpanContext( traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), @@ -146,6 +142,24 @@ object SpanCodec { } } + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + medium.write(marshalBuffer, 0, marshalledSize) + + } + } private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { case SamplingDecision.Sample => 1 diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 3e857f00..ad7ffbed 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -99,6 +99,7 @@ object Tracer { private var initialMetricTags = Map.empty[String, String] private var useParentFromContext = true private var trackMetrics = true + private var providedTraceID = IdentityProvider.NoIdentifier def asChildOf(parent: Span): SpanBuilder = { if(parent != Span.Empty) this.parentSpan = parent @@ -158,6 +159,11 @@ object Tracer { this } + def withTraceID(identifier: IdentityProvider.Identifier): SpanBuilder = { + this.providedTraceID = identifier + this + } + def start(): Span = { val spanFrom = if(from == Instant.EPOCH) clock.instant() else from @@ -199,13 +205,21 @@ object Tracer { else parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) - private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = + private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = { + val traceID = + if(providedTraceID != IdentityProvider.NoIdentifier) + providedTraceID + else + tracer._identityProvider.traceIdGenerator().generate() + + SpanContext( - traceID = tracer._identityProvider.traceIdGenerator().generate(), + traceID, spanID = tracer._identityProvider.spanIdGenerator().generate(), parentID = IdentityProvider.NoIdentifier, samplingDecision = samplingDecision ) + } } private final class TracerMetrics(metricLookup: MetricLookup) { diff --git a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala index 9f17dff0..d36d0df2 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala @@ -15,14 +15,17 @@ package kamon.testkit -import kamon.context.{Context, Key} +import kamon.context.{Context} trait ContextTesting { - val StringKey = Key.local[Option[String]]("string-key", None) - val StringBroadcastKey = Key.broadcastString("string-broadcast-key") + val StringKey = Context.key[Option[String]]("string-key", None) + val StringBroadcastKey = Context.key[Option[String]]("string-broadcast-key", None) def contextWithLocal(value: String): Context = - Context.create(StringKey, Some(value)) + Context.of(StringKey, Some(value)) + + + } object ContextTesting extends ContextTesting diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala b/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala deleted file mode 100644 index c755b0af..00000000 --- a/kamon-testkit/src/main/scala/kamon/testkit/SimpleStringCodec.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.testkit - -import java.nio.ByteBuffer - -import kamon.context.{Codecs, Context, TextMap} - -object SimpleStringCodec { - final class Headers extends Codecs.ForEntry[TextMap] { - private val dataKey = "X-String-Value" - - override def encode(context: Context): TextMap = { - val textMap = TextMap.Default() - context.get(ContextTesting.StringBroadcastKey).foreach { value => - textMap.put(dataKey, value) - } - - textMap - } - - override def decode(carrier: TextMap, context: Context): Context = { - carrier.get(dataKey) match { - case value @ Some(_) => context.withKey(ContextTesting.StringBroadcastKey, value) - case None => context - } - } - } - - final class Binary extends Codecs.ForEntry[ByteBuffer] { - val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0) - - override def encode(context: Context): ByteBuffer = { - context.get(ContextTesting.StringBroadcastKey) match { - case Some(value) => ByteBuffer.wrap(value.getBytes) - case None => emptyBuffer - } - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - context.withKey(ContextTesting.StringBroadcastKey, Some(new String(carrier.array()))) - } - } -} diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala index fbfdc7c3..c28ace64 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala @@ -19,7 +19,7 @@ import java.time.Instant import kamon.Kamon import kamon.trace.{Span, SpanContext} -import kamon.trace.Span.FinishedSpan +import kamon.trace.Span.{FinishedSpan, TagValue} import scala.reflect.ClassTag import scala.util.Try @@ -52,9 +52,21 @@ object SpanInspection { def spanTags(): Map[String, Span.TagValue] = spanData.tags + def tag(key: String): Option[String] = { + spanTag(key).map { + case TagValue.String(string) => string + case TagValue.Number(number) => number.toString + case TagValue.True => "true" + case TagValue.False => "false" + } orElse(metricTag(key)) + } + def metricTags(): Map[String, String] = getField[Span.Local, Map[String, String]](realSpan, "customMetricTags") + def metricTag(key: String): Option[String] = + metricTags().get(key) + def from(): Instant = getField[Span.Local, Instant](realSpan, "from") @@ -64,6 +76,9 @@ object SpanInspection { def operationName(): String = spanData.operationName + def hasMetricsEnabled(): Boolean = + getField[Span.Local, Boolean](realSpan, "collectMetrics") + private def getField[T, R](target: Any, fieldName: String)(implicit classTag: ClassTag[T]): R = { val toFinishedSpanMethod = classTag.runtimeClass.getDeclaredFields.find(_.getName.contains(fieldName)).get diff --git a/version.sbt b/version.sbt index 918f99a8..a8bd390a 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.4-SNAPSHOT" +version in ThisBuild := "1.2.0-SNAPSHOT" |