aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
-rw-r--r--kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala301
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codecs.scala295
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala112
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala206
-rw-r--r--kamon-core/src/main/scala/kamon/context/Mixin.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/context/Propagation.scala81
6 files changed, 643 insertions, 397 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
new file mode 100644
index 00000000..75e65c44
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
@@ -0,0 +1,301 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
+
+import com.typesafe.config.Config
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success, Try}
+
+
+/**
+ * Context propagation that uses byte stream abstractions as the transport medium. The Binary propagation uses
+ * instances of [[ByteStreamReader]] and [[ByteStreamWriter]] to decode and encode Context instances, respectively.
+ *
+ * Binary propagation uses the [[ByteStreamReader]] and [[ByteStreamWriter]] abstraction which closely model the APIs
+ * from [[InputStream]] and [[OutputStream]], but without exposing additional functionality that wouldn't have any
+ * well defined behavior for Context propagation, e.g. flush or close functions on OutputStreams.
+ */
+object BinaryPropagation {
+
+ /**
+ * Represents a readable stream of bytes. This interface closely resembles [[InputStream]], minus the functionality
+ * that wouldn't have a clearly defined behavior in the context of Context propagation.
+ */
+ trait ByteStreamReader {
+ /**
+ * Number of available bytes on the ByteStream.
+ */
+ def available(): Int
+
+ /**
+ * Reads as many bytes as possible into the target byte array.
+ *
+ * @param target Target buffer in which the read bytes will be written.
+ * @return The number of bytes written into the target buffer.
+ */
+ def read(target: Array[Byte]): Int
+
+ /**
+ * Reads a specified number of bytes into the target buffer, starting from the offset position.
+ *
+ * @param target Target buffer in which read bytes will be written.
+ * @param offset Offset index in which to start writing bytes on the target buffer.
+ * @param count Number of bytes to be read.
+ * @return The number of bytes written into the target buffer.
+ */
+ def read(target: Array[Byte], offset: Int, count: Int): Int
+
+ /**
+ * Reads all available bytes into a newly created byte array.
+ *
+ * @return All bytes read.
+ */
+ def readAll(): Array[Byte]
+ }
+
+
+ object ByteStreamReader {
+
+ /**
+ * Creates a new [[ByteStreamReader]] that reads data from a byte array.
+ */
+ def of(bytes: Array[Byte]): ByteStreamReader = new ByteArrayInputStream(bytes) with ByteStreamReader {
+ override def readAll(): Array[Byte] = {
+ val target = Array.ofDim[Byte](available())
+ read(target, 0, available())
+ target
+ }
+ }
+ }
+
+
+ /**
+ * Represents a writable stream of bytes. This interface closely resembles [[OutputStream]], minus the functionality
+ * that wouldn't have a clearly defined behavior in the context of Context propagation.
+ */
+ trait ByteStreamWriter {
+
+ /**
+ * Writes all bytes into the stream.
+ */
+ def write(bytes: Array[Byte]): Unit
+
+ /**
+ * Writes a portion of the provided bytes into the stream.
+ *
+ * @param bytes Buffer from which data will be selected.
+ * @param offset Starting index on the buffer.
+ * @param count Number of bytes to write into the stream.
+ */
+ def write(bytes: Array[Byte], offset: Int, count: Int): Unit
+
+ /**
+ * Write a single byte into the stream.
+ */
+ def write(byte: Int): Unit
+
+ }
+
+ object ByteStreamWriter {
+
+ /**
+ * Creates a new [[ByteStreamWriter]] from an OutputStream.
+ */
+ def of(outputStream: OutputStream): ByteStreamWriter = new ByteStreamWriter {
+ override def write(bytes: Array[Byte]): Unit =
+ outputStream.write(bytes)
+
+ override def write(bytes: Array[Byte], offset: Int, count: Int): Unit =
+ outputStream.write(bytes, offset, count)
+
+ override def write(byte: Int): Unit =
+ outputStream.write(byte)
+ }
+ }
+
+
+
+ /**
+ * Create a new default Binary Propagation instance from the provided configuration.
+ *
+ * @param config Binary Propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): Propagation[ByteStreamReader, ByteStreamWriter] = {
+ new BinaryPropagation.Default(Settings.from(config, classLoading))
+ }
+
+ /**
+ * Default Binary propagation in Kamon. This implementation uses Colfer to read and write the context tags and
+ * entries. Entries are represented as simple pairs of entry name and bytes, which are then processed by the all
+ * configured entry readers and writers.
+ */
+ class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] {
+ private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
+ private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] {
+ override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
+ }
+
+ private val _contextBufferPool = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize)
+ }
+
+ override def read(reader: ByteStreamReader): Context = {
+ if(reader.available() > 0) {
+ val contextData = Try {
+ val cContext = new ColferContext()
+ cContext.unmarshal(reader.readAll(), 0)
+ cContext
+ }
+
+ contextData.failed.foreach {
+ case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t)
+ }
+
+ contextData.map { colferContext =>
+
+ // Context tags
+ var tagSectionsCount = colferContext.tags.length
+ if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
+ _log.warn("Malformed Context tags found, tags consistency might be compromised")
+ tagSectionsCount -= 1
+ }
+
+ val tags = if (tagSectionsCount > 0) {
+ val tagsBuilder = Map.newBuilder[String, String]
+ var tagIndex = 0
+ while (tagIndex < tagSectionsCount) {
+ tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1))
+ tagIndex += 2
+ }
+ tagsBuilder.result()
+
+ } else Map.empty[String, String]
+
+
+ // Only reads the entries for which there is a registered reader
+ colferContext.entries.foldLeft(Context.of(tags)) {
+ case (context, entryData) =>
+ settings.incomingEntries.get(entryData.name).map { entryReader =>
+ var contextWithEntry = context
+ try {
+ contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
+ }
+
+ contextWithEntry
+ }.getOrElse(context)
+ }
+ } getOrElse(Context.Empty)
+ } else Context.Empty
+ }
+
+ override def write(context: Context, writer: ByteStreamWriter): Unit = {
+ if (context.nonEmpty()) {
+ val contextData = new ColferContext()
+ val output = _streamPool.get()
+ val contextOutgoingBuffer = _contextBufferPool.get()
+
+ if (context.tags.nonEmpty) {
+ val tags = Array.ofDim[String](context.tags.size * 2)
+ var tagIndex = 0
+ context.tags.foreach {
+ case (key, value) =>
+ tags.update(tagIndex, key)
+ tags.update(tagIndex + 1, value)
+ tagIndex += 2
+ }
+
+ contextData.tags = tags
+ }
+
+ if (context.entries.nonEmpty) {
+ val entries = settings.outgoingEntries.collect {
+ case (entryName, entryWriter) if context.entries.contains(entryName) =>
+ val colferEntry = new ColferEntry()
+ try {
+ output.reset()
+ entryWriter.write(context, output)
+
+ colferEntry.name = entryName
+ colferEntry.content = output.toByteArray()
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
+ }
+
+ colferEntry
+ }
+
+ contextData.entries = entries.toArray
+ }
+
+ try {
+ val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
+ writer.write(contextOutgoingBuffer, 0, contextSize)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t)
+ }
+ }
+ }
+ }
+
+ object Default {
+ private class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter {
+ def underlying(): Array[Byte] = this.buf
+ }
+ }
+
+ case class Settings(
+ maxOutgoingSize: Int,
+ incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]]
+ )
+
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[BinaryPropagation.Settings])
+
+ def from(config: Config, classLoading: ClassLoading): BinaryPropagation.Settings = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val instanceMap = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
+ }
+ }
+
+ instanceMap.result()
+ }
+
+ Settings(
+ config.getBytes("max-outgoing-size").toInt,
+ buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs),
+ buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs)
+ )
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala
deleted file mode 100644
index c5d237a9..00000000
--- a/kamon-core/src/main/scala/kamon/context/Codecs.scala
+++ /dev/null
@@ -1,295 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon
-package context
-
-import java.nio.ByteBuffer
-
-import com.typesafe.config.Config
-import kamon.util.DynamicAccess
-import org.slf4j.LoggerFactory
-import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
-
-import scala.collection.mutable
-
-class Codecs(initialConfig: Config) {
- private val log = LoggerFactory.getLogger(classOf[Codecs])
- @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty)
- @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty)
-
- reconfigure(initialConfig)
-
-
- def HttpHeaders: Codecs.ForContext[TextMap] =
- httpHeaders
-
- def Binary: Codecs.ForContext[ByteBuffer] =
- binary
-
- def reconfigure(config: Config): Unit = {
- import scala.collection.JavaConverters._
- try {
- val codecsConfig = config.getConfig("kamon.context.codecs")
- val stringKeys = readStringKeysConfig(codecsConfig.getConfig("string-keys"))
- val knownHttpHeaderCodecs = readEntryCodecs[TextMap]("http-headers-keys", codecsConfig) ++ stringHeaderCodecs(stringKeys)
- val knownBinaryCodecs = readEntryCodecs[ByteBuffer]("binary-keys", codecsConfig) ++ stringBinaryCodecs(stringKeys)
-
- httpHeaders = new Codecs.HttpHeaders(knownHttpHeaderCodecs)
- binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), knownBinaryCodecs)
- } catch {
- case t: Throwable => log.error("Failed to initialize Context Codecs", t)
- }
- }
-
- private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = {
- val rootConfig = config.getConfig(rootKey)
- val dynamic = new DynamicAccess(getClass.getClassLoader)
- val entries = Map.newBuilder[String, Codecs.ForEntry[T]]
-
- rootConfig.topLevelKeys.foreach(key => {
- try {
- val fqcn = rootConfig.getString(key)
- entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get))
- } catch {
- case e: Throwable =>
- log.error(s"Failed to initialize codec for key [$key]", e)
- }
- })
-
- entries.result()
- }
-
- private def readStringKeysConfig(config: Config): Map[String, String] =
- config.topLevelKeys.map(key => (key, config.getString(key))).toMap
-
- private def stringHeaderCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[TextMap]] =
- keys.map { case (key, header) => (key, new Codecs.StringHeadersCodec(key, header)) }
-
- private def stringBinaryCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[ByteBuffer]] =
- keys.map { case (key, _) => (key, new Codecs.StringBinaryCodec(key)) }
-}
-
-object Codecs {
-
- trait ForContext[T] {
- def encode(context: Context): T
- def decode(carrier: T): Context
- }
-
- trait ForEntry[T] {
- def encode(context: Context): T
- def decode(carrier: T, context: Context): Context
- }
-
- final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] {
- private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
-
- override def encode(context: Context): TextMap = {
- val encoded = TextMap.Default()
-
- context.entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(codec) =>
- try {
- codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
- } catch {
- case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
- }
-
- case None =>
- log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
-
- encoded
- }
-
- override def decode(carrier: TextMap): Context = {
- var context: Context = Context.Empty
-
- try {
- context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
- val (_, codec) = codecEntry
- codec.decode(carrier, ctx)
- })
-
- } catch {
- case e: Throwable =>
- log.error("Failed to decode context from HttpHeaders", e)
- }
-
- context
- }
- }
-
-
- final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] {
- private val log = LoggerFactory.getLogger(classOf[Binary])
- private val binaryBuffer = newThreadLocalBuffer(bufferSize)
- private val emptyBuffer = ByteBuffer.allocate(0)
-
- override def encode(context: Context): ByteBuffer = {
- val entries = context.entries
- if(entries.isEmpty)
- emptyBuffer
- else {
- var colferEntries: List[ColferEntry] = Nil
- entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(entryCodec) =>
- try {
- val entryData = entryCodec.encode(context)
- if(entryData.capacity() > 0) {
- val colferEntry = new ColferEntry()
- colferEntry.setName(key.name)
- colferEntry.setContent(entryData.array())
- colferEntries = colferEntry :: colferEntries
- }
- } catch {
- case throwable: Throwable =>
- log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
- }
-
- case None =>
- log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
-
- if(colferEntries.isEmpty)
- emptyBuffer
- else {
- val buffer = binaryBuffer.get()
- val colferContext = new ColferContext()
- colferContext.setEntries(colferEntries.toArray)
- val marshalledSize = colferContext.marshal(buffer, 0)
-
- val data = Array.ofDim[Byte](marshalledSize)
- System.arraycopy(buffer, 0, data, 0, marshalledSize)
- ByteBuffer.wrap(data)
- }
- }
- }
-
-
- override def decode(carrier: ByteBuffer): Context = {
- if(carrier.capacity() == 0)
- Context.Empty
- else {
- var context: Context = Context.Empty
-
- try {
- val colferContext = new ColferContext()
- colferContext.unmarshal(carrier.array(), 0)
-
- colferContext.entries.foreach(colferEntry => {
- entryCodecs.get(colferEntry.getName()) match {
- case Some(entryCodec) =>
- context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context)
-
- case None =>
- log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName())
- }
- })
-
- } catch {
- case e: Throwable =>
- log.error("Failed to decode context from Binary", e)
- }
-
- context
- }
- }
-
-
- private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] {
- override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt)
- }
- }
-
- private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] {
- private val contextKey = Key.broadcast[Option[String]](key, None)
-
- override def encode(context: Context): TextMap = {
- val textMap = TextMap.Default()
- context.get(contextKey).foreach { value =>
- textMap.put(headerName, value)
- }
-
- textMap
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
- carrier.get(headerName) match {
- case value @ Some(_) => context.withKey(contextKey, value)
- case None => context
- }
- }
- }
-
- private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] {
- val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0)
- private val contextKey = Key.broadcast[Option[String]](key, None)
-
- override def encode(context: Context): ByteBuffer = {
- context.get(contextKey) match {
- case Some(value) => ByteBuffer.wrap(value.getBytes)
- case None => emptyBuffer
- }
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- context.withKey(contextKey, Some(new String(carrier.array())))
- }
- }
-}
-
-
-trait TextMap {
-
- def get(key: String): Option[String]
-
- def put(key: String, value: String): Unit
-
- def values: Iterator[(String, String)]
-}
-
-object TextMap {
-
- class Default extends TextMap {
- private val storage =
- mutable.Map.empty[String, String]
-
- override def get(key: String): Option[String] =
- storage.get(key)
-
- override def put(key: String, value: String): Unit =
- storage.put(key, value)
-
- override def values: Iterator[(String, String)] =
- storage.toIterator
- }
-
- object Default {
- def apply(): Default =
- new Default()
- }
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index e0b084cb..2a7a382e 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -1,5 +1,5 @@
/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
@@ -13,90 +13,88 @@
* =========================================================================================
*/
-package kamon.context
+package kamon
+package context
-import java.io._
-import java.nio.ByteBuffer
+import java.util.{Map => JavaMap}
+import scala.collection.JavaConverters._
-import kamon.Kamon
+class Context private (val entries: Map[String, Any], val tags: Map[String, String]) {
-class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable {
- def get[T](key: Key[T]): T =
- entries.getOrElse(key, key.emptyValue).asInstanceOf[T]
+ def get[T](key: Context.Key[T]): T =
+ entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T]
- def withKey[T](key: Key[T], value: T): Context =
- new Context(entries.updated(key, value))
+ def getTag(tagKey: String): Option[String] =
+ tags.get(tagKey)
- var _deserializedEntries: Map[Key[_], Any] = Map.empty
+ def withKey[T](key: Context.Key[T], value: T): Context =
+ new Context(entries.updated(key.name, value), tags)
- @throws[IOException]
- private def writeObject(out: ObjectOutputStream): Unit = out.write(
- Kamon.contextCodec().Binary.encode(this).array()
- )
+ def withTag(tagKey: String, tagValue: String): Context =
+ new Context(entries, tags.updated(tagKey, tagValue))
- @throws[IOException]
- @throws[ClassNotFoundException]
- private def readObject(in: ObjectInputStream): Unit = {
- val buf = new Array[Byte](in.available())
- in.readFully(buf)
- _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries
- }
+ def withTags(tags: Map[String, String]): Context =
+ new Context(entries, this.tags ++ tags)
- def readResolve(): AnyRef = new Context(_deserializedEntries)
+ def withTags(tags: JavaMap[String, String]): Context =
+ new Context(entries, this.tags ++ tags.asScala.toMap)
- override def equals(obj: scala.Any): Boolean = {
- obj != null &&
- obj.isInstanceOf[Context] &&
- obj.asInstanceOf[Context].entries != null &&
- obj.asInstanceOf[Context].entries == this.entries
- }
+ def isEmpty(): Boolean =
+ entries.isEmpty && tags.isEmpty
- override def hashCode(): Int = entries.hashCode()
+ def nonEmpty(): Boolean =
+ !isEmpty()
}
object Context {
- val Empty = new Context(Map.empty)
+ val Empty = new Context(Map.empty, Map.empty)
- def apply(): Context =
- Empty
+ def of(tags: JavaMap[String, String]): Context =
+ new Context(Map.empty, tags.asScala.toMap)
- def create(): Context =
- Empty
+ def of(tags: Map[String, String]): Context =
+ new Context(Map.empty, tags)
- def apply[T](key: Key[T], value: T): Context =
- new Context(Map(key -> value))
+ def of[T](key: Context.Key[T], value: T): Context =
+ new Context(Map(key.name -> value), Map.empty)
- def create[T](key: Key[T], value: T): Context =
- apply(key, value)
-
-}
-
-
-sealed abstract class Key[T] {
- def name: String
- def emptyValue: T
- def broadcast: Boolean
-}
+ def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context =
+ new Context(Map(key.name -> value), tags.asScala.toMap)
-object Key {
+ def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context =
+ new Context(Map(key.name -> value), tags)
- def local[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, false)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context =
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty)
- def broadcast[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context =
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap)
- def broadcastString(name: String): Key[Option[String]] =
- new Default[Option[String]](name, None, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context =
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags)
+ def key[T](name: String, emptyValue: T): Context.Key[T] =
+ new Context.Key(name, emptyValue)
- private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] {
+ /**
+ * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance
+ * must be done using a context key, which will ensure the right type is used on both operations. The key's name
+ * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels.
+ *
+ * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned
+ * instead.
+ *
+ * @param name Key name. Must be unique.
+ * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key.
+ * @tparam ValueType Type of the value to be held on the context with this key.
+ */
+ final class Key[ValueType](val name: String, val emptyValue: ValueType) {
override def hashCode(): Int =
name.hashCode
override def equals(that: Any): Boolean =
- that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name
+ that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name
}
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
new file mode 100644
index 00000000..6a15e2a6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -0,0 +1,206 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
+
+/**
+ * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of
+ * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on
+ * outgoing requests to transfer a context to remote services.
+ */
+object HttpPropagation {
+
+ /**
+ * Wrapper that reads HTTP headers from a HTTP message.
+ */
+ trait HeaderReader {
+
+ /**
+ * Reads a single HTTP header value.
+ *
+ * @param header HTTP header name
+ * @return The HTTP header value, if present.
+ */
+ def read(header: String): Option[String]
+
+ /**
+ * Reads all HTTP headers present in the wrapped HTTP message.
+ *
+ * @return A map from header name to
+ */
+ def readAll(): Map[String, String]
+ }
+
+ /**
+ * Wrapper that writes HTTP headers to a HTTP message.
+ */
+ trait HeaderWriter {
+
+ /**
+ * Writes a HTTP header into a HTTP message.
+ *
+ * @param header HTTP header name.
+ * @param value HTTP header value.
+ */
+ def write(header: String, value: String): Unit
+ }
+
+
+
+ /**
+ * Create a new default HTTP propagation instance from the provided configuration.
+ *
+ * @param config HTTP propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = {
+ new HttpPropagation.Default(Settings.from(config, classLoading))
+ }
+
+ /**
+ * Default HTTP Propagation in Kamon.
+ */
+ class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
+
+ /**
+ * Reads context tags and entries on the following order:
+ * 1. Read all context tags from the context tags header.
+ * 2. Read all context tags with explicit mappings. This overrides any tag
+ * from the previous step in case of a tag key clash.
+ * 3. Read all context entries using the incoming entries configuration.
+ */
+ override def read(reader: HeaderReader): Context = {
+ val tags = Map.newBuilder[String, String]
+
+ // Tags encoded together in the context tags header.
+ try {
+ reader.read(settings.tagsHeaderName).foreach { contextTagsHeader =>
+ contextTagsHeader.split(";").foreach(tagData => {
+ val tagPair = tagData.split("=")
+ if (tagPair.length == 2) {
+ tags += (tagPair(0) -> tagPair(1))
+ }
+ })
+ }
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
+ }
+
+ // Tags explicitly mapped on the tags.mappings configuration.
+ settings.tagsMappings.foreach {
+ case (tagName, httpHeader) =>
+ try {
+ reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
+ }
+ }
+
+ // Incoming Entries
+ settings.incomingEntries.foldLeft(Context.of(tags.result())) {
+ case (context, (entryName, entryDecoder)) =>
+ var result = context
+ try {
+ result = entryDecoder.read(reader, context)
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+
+ result
+ }
+ }
+
+ /**
+ * Writes context tags and entries
+ */
+ override def write(context: Context, writer: HeaderWriter): Unit = {
+ val contextTagsHeader = new StringBuilder()
+ def appendTag(key: String, value: String): Unit = {
+ contextTagsHeader
+ .append(key)
+ .append('=')
+ .append(value)
+ .append(';')
+ }
+
+ // Write tags with specific mappings or append them to the context tags header.
+ context.tags.foreach {
+ case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
+ case None => appendTag(tagKey, tagValue)
+ }
+ }
+
+ // Write the context tags header.
+ if(contextTagsHeader.nonEmpty) {
+ writer.write(settings.tagsHeaderName, contextTagsHeader.result())
+ }
+
+ // Write entries for the specified direction.
+ settings.outgoingEntries.foreach {
+ case (entryName, entryWriter) =>
+ try {
+ entryWriter.write(context, writer)
+ } catch {
+ case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+ }
+ }
+ }
+
+ case class Settings(
+ tagsHeaderName: String,
+ tagsMappings: Map[String, String],
+ incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]]
+ )
+
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings])
+
+ def from(config: Config, classLoading: ClassLoading): Settings = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val instanceMap = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
+ }
+ }
+
+ instanceMap.result()
+ }
+
+ val tagsHeaderName = config.getString("tags.header-name")
+ val tagsMappings = config.getConfig("tags.mappings").pairs
+ val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs)
+ val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs)
+
+ Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries)
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala
deleted file mode 100644
index 3445cc31..00000000
--- a/kamon-core/src/main/scala/kamon/context/Mixin.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.context
-
-import kamon.Kamon
-
-
-/**
- * Utility trait that marks objects carrying a reference to a Context instance.
- *
- */
-trait HasContext {
- def context: Context
-}
-
-object HasContext {
- private case class Default(context: Context) extends HasContext
-
- /**
- * Construct a HasSpan instance that references the provided Context.
- *
- */
- def from(context: Context): HasContext =
- Default(context)
-
- /**
- * Construct a HasContext instance with the current Kamon from Kamon's default context storage.
- *
- */
- def fromCurrentContext(): HasContext =
- Default(Kamon.currentContext())
-}
diff --git a/kamon-core/src/main/scala/kamon/context/Propagation.scala b/kamon-core/src/main/scala/kamon/context/Propagation.scala
new file mode 100644
index 00000000..1d973ca9
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Propagation.scala
@@ -0,0 +1,81 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.context
+
+/**
+ * Inter-process Context propagation interface. Implementations are responsible from reading and writing a lossless
+ * representation of a given Context instance to the appropriate mediums. Out of the box, Kamon ships with two
+ * implementations:
+ *
+ * * HttpPropagation: Uses HTTP headers as the medium to transport the Context data.
+ * * BinaryPropagation: Uses byte streams as the medium to transport the Context data.
+ *
+ */
+trait Propagation[ReaderMedium, WriterMedium] {
+
+ /**
+ * Attempts to read a Context from the [[ReaderMedium]].
+ *
+ * @param medium An abstraction the reads data from the medium transporting the Context data.
+ * @return The decoded Context instance or an empty Context if no entries or tags could be read from the medium.
+ */
+ def read(medium: ReaderMedium): Context
+
+ /**
+ * Attempts to write a Context instance to the [[WriterMedium]].
+ *
+ * @param context Context instance to be written.
+ * @param medium An abstraction that writes data into the medium that will transport the Context data.
+ */
+ def write(context: Context, medium: WriterMedium): Unit
+
+}
+
+object Propagation {
+
+ /**
+ * Encapsulates logic required to read a single context entry from a medium. Implementations of this trait
+ * must be aware of the entry they are able to read.
+ */
+ trait EntryReader[Medium] {
+
+ /**
+ * Tries to read a context entry from the medium. If a context entry is successfully read, implementations
+ * must return an updated context instance that includes such entry. If no entry could be read simply return the
+ * context instance that was passed in, unchanged.
+ *
+ * @param medium An abstraction the reads data from the medium transporting the Context data.
+ * @param context Current context.
+ * @return Either the original context passed in or a modified version of it, including the read entry.
+ */
+ def read(medium: Medium, context: Context): Context
+ }
+
+ /**
+ * Encapsulates logic required to write a single context entry to the medium. Implementations of this trait
+ * must be aware of the entry they are able to write.
+ */
+ trait EntryWriter[Medium] {
+
+ /**
+ * Tries to write a context entry into the medium.
+ *
+ * @param context The context from which entries should be written.
+ * @param medium An abstraction that writes data into the medium that will transport the Context data.
+ */
+ def write(context: Context, medium: Medium): Unit
+ }
+}