aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-08-30 10:40:53 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2018-08-30 10:40:53 +0200
commite4abea098ef4d6e71a805812bfa95c14bd9002b5 (patch)
treef5fcb8222e293f420a9e7c06953805a7428d0f0e /kamon-core/src/main/scala/kamon/context
parent794fbf02664ac8c31072d8b955d897901f1f22e0 (diff)
downloadKamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.gz
Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.bz2
Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.zip
working on context tags and http propagation improvements
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codecs.scala108
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala105
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala283
-rw-r--r--kamon-core/src/main/scala/kamon/context/Mixin.scala45
4 files changed, 387 insertions, 154 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala
index c5d237a9..465f53be 100644
--- a/kamon-core/src/main/scala/kamon/context/Codecs.scala
+++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala
@@ -100,22 +100,22 @@ object Codecs {
override def encode(context: Context): TextMap = {
val encoded = TextMap.Default()
- context.entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(codec) =>
- try {
- codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
- } catch {
- case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
- }
-
- case None =>
- log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
+// context.entries.foreach {
+// case (key, _) if key.broadcast =>
+// entryCodecs.get(key.name) match {
+// case Some(codec) =>
+// try {
+// codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+// } catch {
+// case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+// }
+//
+// case None =>
+// log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+// }
+//
+// case _ => // All non-broadcast keys should be ignored.
+// }
encoded
}
@@ -150,29 +150,29 @@ object Codecs {
emptyBuffer
else {
var colferEntries: List[ColferEntry] = Nil
- entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(entryCodec) =>
- try {
- val entryData = entryCodec.encode(context)
- if(entryData.capacity() > 0) {
- val colferEntry = new ColferEntry()
- colferEntry.setName(key.name)
- colferEntry.setContent(entryData.array())
- colferEntries = colferEntry :: colferEntries
- }
- } catch {
- case throwable: Throwable =>
- log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
- }
-
- case None =>
- log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
+// entries.foreach {
+// case (key, _) if key.broadcast =>
+// entryCodecs.get(key.name) match {
+// case Some(entryCodec) =>
+// try {
+// val entryData = entryCodec.encode(context)
+// if(entryData.capacity() > 0) {
+// val colferEntry = new ColferEntry()
+// colferEntry.setName(key.name)
+// colferEntry.setContent(entryData.array())
+// colferEntries = colferEntry :: colferEntries
+// }
+// } catch {
+// case throwable: Throwable =>
+// log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
+// }
+//
+// case None =>
+// log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
+// }
+//
+// case _ => // All non-broadcast keys should be ignored.
+// }
if(colferEntries.isEmpty)
emptyBuffer
@@ -226,38 +226,40 @@ object Codecs {
}
private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] {
- private val contextKey = Key.broadcast[Option[String]](key, None)
+ //private val contextKey = Key.broadcast[Option[String]](key, None)
override def encode(context: Context): TextMap = {
val textMap = TextMap.Default()
- context.get(contextKey).foreach { value =>
- textMap.put(headerName, value)
- }
+// context.get(contextKey).foreach { value =>
+// textMap.put(headerName, value)
+// }
textMap
}
override def decode(carrier: TextMap, context: Context): Context = {
- carrier.get(headerName) match {
- case value @ Some(_) => context.withKey(contextKey, value)
- case None => context
- }
+ ???
+// carrier.get(headerName) match {
+// case value @ Some(_) => context.withKey(contextKey, value)
+// case None => context
+// }
}
}
private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] {
val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0)
- private val contextKey = Key.broadcast[Option[String]](key, None)
+ //private val contextKey = Key.broadcast[Option[String]](key, None)
override def encode(context: Context): ByteBuffer = {
- context.get(contextKey) match {
- case Some(value) => ByteBuffer.wrap(value.getBytes)
- case None => emptyBuffer
- }
+// context.get(contextKey) match {
+// case Some(value) => ByteBuffer.wrap(value.getBytes)
+// case None => emptyBuffer
+// }
+ ???
}
override def decode(carrier: ByteBuffer, context: Context): Context = {
- context.withKey(contextKey, Some(new String(carrier.array())))
+ ??? //context.withKey(contextKey, Some(new String(carrier.array())))
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index e0b084cb..1eed7e14 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -13,90 +13,83 @@
* =========================================================================================
*/
-package kamon.context
+package kamon
+package context
-import java.io._
-import java.nio.ByteBuffer
+import java.util.{Map => JavaMap}
+import scala.collection.JavaConverters._
-import kamon.Kamon
+class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) {
-class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable {
- def get[T](key: Key[T]): T =
+ def get[T](key: Context.Key[T]): T =
entries.getOrElse(key, key.emptyValue).asInstanceOf[T]
- def withKey[T](key: Key[T], value: T): Context =
- new Context(entries.updated(key, value))
+ def getTag(tagKey: String): Option[String] =
+ tags.get(tagKey)
- var _deserializedEntries: Map[Key[_], Any] = Map.empty
+ def withKey[T](key: Context.Key[T], value: T): Context =
+ new Context(entries.updated(key, value), tags)
- @throws[IOException]
- private def writeObject(out: ObjectOutputStream): Unit = out.write(
- Kamon.contextCodec().Binary.encode(this).array()
- )
+ def withTag(tagKey: String, tagValue: String): Context =
+ new Context(entries, tags.updated(tagKey, tagValue))
- @throws[IOException]
- @throws[ClassNotFoundException]
- private def readObject(in: ObjectInputStream): Unit = {
- val buf = new Array[Byte](in.available())
- in.readFully(buf)
- _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries
- }
-
- def readResolve(): AnyRef = new Context(_deserializedEntries)
+ def withTags(tags: Map[String, String]): Context =
+ new Context(entries, this.tags ++ tags)
- override def equals(obj: scala.Any): Boolean = {
- obj != null &&
- obj.isInstanceOf[Context] &&
- obj.asInstanceOf[Context].entries != null &&
- obj.asInstanceOf[Context].entries == this.entries
- }
+ def withTags(tags: JavaMap[String, String]): Context =
+ new Context(entries, this.tags ++ tags.asScala.toMap)
- override def hashCode(): Int = entries.hashCode()
}
object Context {
- val Empty = new Context(Map.empty)
-
- def apply(): Context =
- Empty
+ val Empty = new Context(Map.empty, Map.empty)
- def create(): Context =
- Empty
+ def of(tags: JavaMap[String, String]): Context =
+ new Context(Map.empty, tags.asScala.toMap)
- def apply[T](key: Key[T], value: T): Context =
- new Context(Map(key -> value))
+ def of(tags: Map[String, String]): Context =
+ new Context(Map.empty, tags)
- def create[T](key: Key[T], value: T): Context =
- apply(key, value)
+ def of[T](key: Context.Key[T], value: T): Context =
+ new Context(Map(key -> value), Map.empty)
-}
-
-
-sealed abstract class Key[T] {
- def name: String
- def emptyValue: T
- def broadcast: Boolean
-}
+ def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context =
+ new Context(Map(key -> value), tags.asScala.toMap)
-object Key {
+ def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context =
+ new Context(Map(key -> value), tags)
- def local[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, false)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context =
+ new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), Map.empty)
- def broadcast[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context =
+ new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags.asScala.toMap)
- def broadcastString(name: String): Key[Option[String]] =
- new Default[Option[String]](name, None, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context =
+ new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags)
+ def key[T](name: String, emptyValue: T): Context.Key[T] =
+ new Context.Key(name, emptyValue)
- private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] {
+ /**
+ * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance
+ * must be done using a context key, which will ensure the right type is used on both operations. The key's name
+ * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels.
+ *
+ * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned
+ * instead.
+ *
+ * @param name Key name. Must be unique.
+ * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key.
+ * @tparam ValueType Type of the value to be held on the context with this key.
+ */
+ final class Key[ValueType](val name: String, val emptyValue: ValueType) {
override def hashCode(): Int =
name.hashCode
override def equals(that: Any): Boolean =
- that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name
+ that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name
}
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
new file mode 100644
index 00000000..5b0bdb38
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -0,0 +1,283 @@
+package kamon
+package context
+
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success}
+
+/**
+ * Context Propagation for HTTP transports. When using HTTP transports all the context related information is
+ * read from and written to HTTP headers. The context information may be included in the following directions:
+ * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read.
+ * - Outgoing: Used for HTTP requests leaving this service.
+ * - Returning: Used for HTTP responses send back to clients of this service.
+ */
+trait HttpPropagation {
+
+ /**
+ * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a
+ * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is
+ * implementation specific.
+ *
+ * @param reader Wrapper on the HTTP message from which headers are read.
+ * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
+ * empty context is returned instead.
+ */
+ def read(reader: HttpPropagation.HeaderReader): Context
+
+ /**
+ * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
+ * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation
+ * specific.
+ *
+ * Implementations are expected to produce side effects on the wrapped HTTP Messages.
+ *
+ * @param context Context instance to be written.
+ * @param writer Wrapper on the HTTP message that will carry the context headers.
+ * @param direction Write direction. It can be either Outgoing or Returning.
+ */
+ def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
+
+}
+
+object HttpPropagation {
+
+ /**
+ * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait
+ * must be aware of the entry they are able to read and the HTTP headers required to do so.
+ */
+ trait EntryReader {
+
+ /**
+ * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations
+ * must return an updated context instance that includes such entry. If no entry could be read simply return
+ * context instance that was passed in, untouched.
+ *
+ * @param reader Wrapper on the HTTP message from which headers are read.
+ * @param context Current context.
+ * @return Either the original context passed in or a modified version of it, including the read entry.
+ */
+ def read(reader: HttpPropagation.HeaderReader, context: Context): Context
+ }
+
+ /**
+ * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait
+ * must be aware of the entry they are able to write and the HTTP headers required to do so.
+ */
+ trait EntryWriter {
+
+ /**
+ * Tries to write a context entry into HTTP headers.
+ *
+ * @param context The context from which entries should be written.
+ * @param writer Wrapper on the HTTP message that will carry the context headers.
+ * @param direction Write direction. It can be either Outgoing or Returning.
+ */
+ def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
+ }
+
+
+ /**
+ * Wrapper that reads HTTP headers from HTTP a message.
+ */
+ trait HeaderReader {
+
+ /**
+ * Reads an HTTP header value
+ *
+ * @param header HTTP header name
+ * @return The HTTP header value, if present.
+ */
+ def read(header: String): Option[String]
+ }
+
+ /**
+ * Wrapper that writes HTTP headers to a HTTP message.
+ */
+ trait HeaderWriter {
+
+ /**
+ * Writes a HTTP header into a HTTP message.
+ *
+ * @param header HTTP header name.
+ * @param value HTTP header value.
+ */
+ def write(header: String, value: String): Unit
+ }
+
+
+ /**
+ * Create a new default HttpPropagation instance from the provided configuration.
+ *
+ * @param config HTTP propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): HttpPropagation = {
+ new HttpPropagation.Default(Components.from(config, classLoading))
+ }
+
+ /**
+ * Default HTTP Propagation in Kamon.
+ */
+ final class Default(components: Components) extends HttpPropagation {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
+
+ /**
+ * Reads context tags and entries on the following order:
+ * - Read all context tags from the context tags header.
+ * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case
+ * of a tag key clash.
+ * - Read all context entries using the incoming entries configuration.
+ */
+ override def read(reader: HeaderReader): Context = {
+ val tags = Map.newBuilder[String, String]
+
+ // Tags encoded together in the context tags header.
+ try {
+ reader.read(components.tagsHeaderName).foreach { contextTagsHeader =>
+ contextTagsHeader.split(";").foreach(tagData => {
+ val tagPair = tagData.split("=")
+ if (tagPair.length == 2) {
+ tags += (tagPair(0) -> tagPair(1))
+ }
+ })
+ }
+ } catch {
+ case t: Throwable => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
+ }
+
+ // Tags explicitly mapped on the tags.mappings configuration.
+ components.tagsMappings.foreach {
+ case (tagName, httpHeader) =>
+ try {
+ reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ } catch {
+ case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
+ }
+ }
+
+ // Incoming Entries
+ components.incomingEntries.foldLeft(Context.of(tags.result())) {
+ case (context, (entryName, entryDecoder)) =>
+ var result = context
+ try {
+ result = entryDecoder.read(reader, context)
+ } catch {
+ case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+
+ result
+ }
+ }
+
+ /**
+ * Writes context tags and entries
+ */
+ override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
+ val keys = direction match {
+ case Direction.Outgoing => components.outgoingEntries
+ case Direction.Returning => components.returningEntries
+ }
+
+ val contextTagsHeader = new StringBuilder()
+ def appendTag(key: String, value: String): Unit = {
+ contextTagsHeader
+ .append(key)
+ .append('=')
+ .append(value)
+ .append(';')
+ }
+
+ // Write tags with specific mappings or append them to the context tags header.
+ context.tags.foreach {
+ case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
+ case None => appendTag(tagKey, tagValue)
+ }
+ }
+
+ // Write the context tags header.
+ if(contextTagsHeader.nonEmpty) {
+ writer.write(components.tagsHeaderName, contextTagsHeader.result())
+ }
+
+ // Write entries for the specified direction.
+ keys.foreach {
+ case (entryName, entryWriter) =>
+ try {
+ entryWriter.write(context, writer, direction)
+ } catch {
+ case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+ }
+ }
+ }
+
+ /**
+ * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to
+ * propagate context.
+ */
+ sealed trait Direction
+ object Direction {
+
+ /**
+ * Marker trait for all directions that require write operations.
+ */
+ sealed trait Write
+
+ /**
+ * Requests coming into this service.
+ */
+ case object Incoming extends Direction
+
+ /**
+ * Requests going from this service to others.
+ */
+ case object Outgoing extends Direction with Write
+
+ /**
+ * Responses sent from this service to clients.
+ */
+ case object Returning extends Direction with Write
+ }
+
+
+ case class Components(
+ tagsHeaderName: String,
+ tagsMappings: Map[String, String],
+ incomingEntries: Map[String, HttpPropagation.EntryReader],
+ outgoingEntries: Map[String, HttpPropagation.EntryWriter],
+ returningEntries: Map[String, HttpPropagation.EntryWriter]
+ )
+
+ object Components {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components])
+
+ def from(config: Config, classLoading: ClassLoading): Components = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val entryReaders = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match {
+ case Success(readerInstance) => entryReaders += (contextKey -> readerInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception)
+ }
+ }
+
+ entryReaders.result()
+ }
+
+ val tagsHeaderName = config.getString("tags.header-name")
+ val tagsMappings = config.getConfig("tags.mappings").pairs
+ val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs)
+ val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs)
+ val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs)
+
+ Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries)
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala
deleted file mode 100644
index 3445cc31..00000000
--- a/kamon-core/src/main/scala/kamon/context/Mixin.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.context
-
-import kamon.Kamon
-
-
-/**
- * Utility trait that marks objects carrying a reference to a Context instance.
- *
- */
-trait HasContext {
- def context: Context
-}
-
-object HasContext {
- private case class Default(context: Context) extends HasContext
-
- /**
- * Construct a HasSpan instance that references the provided Context.
- *
- */
- def from(context: Context): HasContext =
- Default(context)
-
- /**
- * Construct a HasContext instance with the current Kamon from Kamon's default context storage.
- *
- */
- def fromCurrentContext(): HasContext =
- Default(Kamon.currentContext())
-}