diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2018-08-30 10:40:53 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2018-08-30 10:40:53 +0200 |
commit | e4abea098ef4d6e71a805812bfa95c14bd9002b5 (patch) | |
tree | f5fcb8222e293f420a9e7c06953805a7428d0f0e /kamon-core/src/main/scala/kamon/context | |
parent | 794fbf02664ac8c31072d8b955d897901f1f22e0 (diff) | |
download | Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.gz Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.bz2 Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.zip |
working on context tags and http propagation improvements
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
4 files changed, 387 insertions, 154 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala index c5d237a9..465f53be 100644 --- a/kamon-core/src/main/scala/kamon/context/Codecs.scala +++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala @@ -100,22 +100,22 @@ object Codecs { override def encode(context: Context): TextMap = { val encoded = TextMap.Default() - context.entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(codec) => - try { - codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) - } catch { - case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) - } - - case None => - log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } +// context.entries.foreach { +// case (key, _) if key.broadcast => +// entryCodecs.get(key.name) match { +// case Some(codec) => +// try { +// codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) +// } catch { +// case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) +// } +// +// case None => +// log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) +// } +// +// case _ => // All non-broadcast keys should be ignored. +// } encoded } @@ -150,29 +150,29 @@ object Codecs { emptyBuffer else { var colferEntries: List[ColferEntry] = Nil - entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(entryCodec) => - try { - val entryData = entryCodec.encode(context) - if(entryData.capacity() > 0) { - val colferEntry = new ColferEntry() - colferEntry.setName(key.name) - colferEntry.setContent(entryData.array()) - colferEntries = colferEntry :: colferEntries - } - } catch { - case throwable: Throwable => - log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) - } - - case None => - log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } +// entries.foreach { +// case (key, _) if key.broadcast => +// entryCodecs.get(key.name) match { +// case Some(entryCodec) => +// try { +// val entryData = entryCodec.encode(context) +// if(entryData.capacity() > 0) { +// val colferEntry = new ColferEntry() +// colferEntry.setName(key.name) +// colferEntry.setContent(entryData.array()) +// colferEntries = colferEntry :: colferEntries +// } +// } catch { +// case throwable: Throwable => +// log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) +// } +// +// case None => +// log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) +// } +// +// case _ => // All non-broadcast keys should be ignored. +// } if(colferEntries.isEmpty) emptyBuffer @@ -226,38 +226,40 @@ object Codecs { } private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] { - private val contextKey = Key.broadcast[Option[String]](key, None) + //private val contextKey = Key.broadcast[Option[String]](key, None) override def encode(context: Context): TextMap = { val textMap = TextMap.Default() - context.get(contextKey).foreach { value => - textMap.put(headerName, value) - } +// context.get(contextKey).foreach { value => +// textMap.put(headerName, value) +// } textMap } override def decode(carrier: TextMap, context: Context): Context = { - carrier.get(headerName) match { - case value @ Some(_) => context.withKey(contextKey, value) - case None => context - } + ??? +// carrier.get(headerName) match { +// case value @ Some(_) => context.withKey(contextKey, value) +// case None => context +// } } } private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] { val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0) - private val contextKey = Key.broadcast[Option[String]](key, None) + //private val contextKey = Key.broadcast[Option[String]](key, None) override def encode(context: Context): ByteBuffer = { - context.get(contextKey) match { - case Some(value) => ByteBuffer.wrap(value.getBytes) - case None => emptyBuffer - } +// context.get(contextKey) match { +// case Some(value) => ByteBuffer.wrap(value.getBytes) +// case None => emptyBuffer +// } + ??? } override def decode(carrier: ByteBuffer, context: Context): Context = { - context.withKey(contextKey, Some(new String(carrier.array()))) + ??? //context.withKey(contextKey, Some(new String(carrier.array()))) } } } diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index e0b084cb..1eed7e14 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -13,90 +13,83 @@ * ========================================================================================= */ -package kamon.context +package kamon +package context -import java.io._ -import java.nio.ByteBuffer +import java.util.{Map => JavaMap} +import scala.collection.JavaConverters._ -import kamon.Kamon +class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) { -class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable { - def get[T](key: Key[T]): T = + def get[T](key: Context.Key[T]): T = entries.getOrElse(key, key.emptyValue).asInstanceOf[T] - def withKey[T](key: Key[T], value: T): Context = - new Context(entries.updated(key, value)) + def getTag(tagKey: String): Option[String] = + tags.get(tagKey) - var _deserializedEntries: Map[Key[_], Any] = Map.empty + def withKey[T](key: Context.Key[T], value: T): Context = + new Context(entries.updated(key, value), tags) - @throws[IOException] - private def writeObject(out: ObjectOutputStream): Unit = out.write( - Kamon.contextCodec().Binary.encode(this).array() - ) + def withTag(tagKey: String, tagValue: String): Context = + new Context(entries, tags.updated(tagKey, tagValue)) - @throws[IOException] - @throws[ClassNotFoundException] - private def readObject(in: ObjectInputStream): Unit = { - val buf = new Array[Byte](in.available()) - in.readFully(buf) - _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries - } - - def readResolve(): AnyRef = new Context(_deserializedEntries) + def withTags(tags: Map[String, String]): Context = + new Context(entries, this.tags ++ tags) - override def equals(obj: scala.Any): Boolean = { - obj != null && - obj.isInstanceOf[Context] && - obj.asInstanceOf[Context].entries != null && - obj.asInstanceOf[Context].entries == this.entries - } + def withTags(tags: JavaMap[String, String]): Context = + new Context(entries, this.tags ++ tags.asScala.toMap) - override def hashCode(): Int = entries.hashCode() } object Context { - val Empty = new Context(Map.empty) - - def apply(): Context = - Empty + val Empty = new Context(Map.empty, Map.empty) - def create(): Context = - Empty + def of(tags: JavaMap[String, String]): Context = + new Context(Map.empty, tags.asScala.toMap) - def apply[T](key: Key[T], value: T): Context = - new Context(Map(key -> value)) + def of(tags: Map[String, String]): Context = + new Context(Map.empty, tags) - def create[T](key: Key[T], value: T): Context = - apply(key, value) + def of[T](key: Context.Key[T], value: T): Context = + new Context(Map(key -> value), Map.empty) -} - - -sealed abstract class Key[T] { - def name: String - def emptyValue: T - def broadcast: Boolean -} + def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context = + new Context(Map(key -> value), tags.asScala.toMap) -object Key { + def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context = + new Context(Map(key -> value), tags) - def local[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, false) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context = + new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), Map.empty) - def broadcast[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context = + new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags.asScala.toMap) - def broadcastString(name: String): Key[Option[String]] = - new Default[Option[String]](name, None, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context = + new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags) + def key[T](name: String, emptyValue: T): Context.Key[T] = + new Context.Key(name, emptyValue) - private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] { + /** + * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance + * must be done using a context key, which will ensure the right type is used on both operations. The key's name + * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels. + * + * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned + * instead. + * + * @param name Key name. Must be unique. + * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key. + * @tparam ValueType Type of the value to be held on the context with this key. + */ + final class Key[ValueType](val name: String, val emptyValue: ValueType) { override def hashCode(): Int = name.hashCode override def equals(that: Any): Boolean = - that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name + that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala new file mode 100644 index 00000000..5b0bdb38 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -0,0 +1,283 @@ +package kamon +package context + +import com.typesafe.config.Config +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag +import scala.util.{Failure, Success} + +/** + * Context Propagation for HTTP transports. When using HTTP transports all the context related information is + * read from and written to HTTP headers. The context information may be included in the following directions: + * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read. + * - Outgoing: Used for HTTP requests leaving this service. + * - Returning: Used for HTTP responses send back to clients of this service. + */ +trait HttpPropagation { + + /** + * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a + * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is + * implementation specific. + * + * @param reader Wrapper on the HTTP message from which headers are read. + * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an + * empty context is returned instead. + */ + def read(reader: HttpPropagation.HeaderReader): Context + + /** + * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]] + * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation + * specific. + * + * Implementations are expected to produce side effects on the wrapped HTTP Messages. + * + * @param context Context instance to be written. + * @param writer Wrapper on the HTTP message that will carry the context headers. + * @param direction Write direction. It can be either Outgoing or Returning. + */ + def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit + +} + +object HttpPropagation { + + /** + * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait + * must be aware of the entry they are able to read and the HTTP headers required to do so. + */ + trait EntryReader { + + /** + * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations + * must return an updated context instance that includes such entry. If no entry could be read simply return + * context instance that was passed in, untouched. + * + * @param reader Wrapper on the HTTP message from which headers are read. + * @param context Current context. + * @return Either the original context passed in or a modified version of it, including the read entry. + */ + def read(reader: HttpPropagation.HeaderReader, context: Context): Context + } + + /** + * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait + * must be aware of the entry they are able to write and the HTTP headers required to do so. + */ + trait EntryWriter { + + /** + * Tries to write a context entry into HTTP headers. + * + * @param context The context from which entries should be written. + * @param writer Wrapper on the HTTP message that will carry the context headers. + * @param direction Write direction. It can be either Outgoing or Returning. + */ + def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit + } + + + /** + * Wrapper that reads HTTP headers from HTTP a message. + */ + trait HeaderReader { + + /** + * Reads an HTTP header value + * + * @param header HTTP header name + * @return The HTTP header value, if present. + */ + def read(header: String): Option[String] + } + + /** + * Wrapper that writes HTTP headers to a HTTP message. + */ + trait HeaderWriter { + + /** + * Writes a HTTP header into a HTTP message. + * + * @param header HTTP header name. + * @param value HTTP header value. + */ + def write(header: String, value: String): Unit + } + + + /** + * Create a new default HttpPropagation instance from the provided configuration. + * + * @param config HTTP propagation channel configuration + * @return A newly constructed HttpPropagation instance. + */ + def from(config: Config, classLoading: ClassLoading): HttpPropagation = { + new HttpPropagation.Default(Components.from(config, classLoading)) + } + + /** + * Default HTTP Propagation in Kamon. + */ + final class Default(components: Components) extends HttpPropagation { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default]) + + /** + * Reads context tags and entries on the following order: + * - Read all context tags from the context tags header. + * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case + * of a tag key clash. + * - Read all context entries using the incoming entries configuration. + */ + override def read(reader: HeaderReader): Context = { + val tags = Map.newBuilder[String, String] + + // Tags encoded together in the context tags header. + try { + reader.read(components.tagsHeaderName).foreach { contextTagsHeader => + contextTagsHeader.split(";").foreach(tagData => { + val tagPair = tagData.split("=") + if (tagPair.length == 2) { + tags += (tagPair(0) -> tagPair(1)) + } + }) + } + } catch { + case t: Throwable => log.warn("Failed to read the context tags header", t.asInstanceOf[Any]) + } + + // Tags explicitly mapped on the tags.mappings configuration. + components.tagsMappings.foreach { + case (tagName, httpHeader) => + try { + reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + } catch { + case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) + } + } + + // Incoming Entries + components.incomingEntries.foldLeft(Context.of(tags.result())) { + case (context, (entryName, entryDecoder)) => + var result = context + try { + result = entryDecoder.read(reader, context) + } catch { + case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + + result + } + } + + /** + * Writes context tags and entries + */ + override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { + val keys = direction match { + case Direction.Outgoing => components.outgoingEntries + case Direction.Returning => components.returningEntries + } + + val contextTagsHeader = new StringBuilder() + def appendTag(key: String, value: String): Unit = { + contextTagsHeader + .append(key) + .append('=') + .append(value) + .append(';') + } + + // Write tags with specific mappings or append them to the context tags header. + context.tags.foreach { + case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match { + case Some(mappedHeader) => writer.write(mappedHeader, tagValue) + case None => appendTag(tagKey, tagValue) + } + } + + // Write the context tags header. + if(contextTagsHeader.nonEmpty) { + writer.write(components.tagsHeaderName, contextTagsHeader.result()) + } + + // Write entries for the specified direction. + keys.foreach { + case (entryName, entryWriter) => + try { + entryWriter.write(context, writer, direction) + } catch { + case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + } + } + } + + /** + * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to + * propagate context. + */ + sealed trait Direction + object Direction { + + /** + * Marker trait for all directions that require write operations. + */ + sealed trait Write + + /** + * Requests coming into this service. + */ + case object Incoming extends Direction + + /** + * Requests going from this service to others. + */ + case object Outgoing extends Direction with Write + + /** + * Responses sent from this service to clients. + */ + case object Returning extends Direction with Write + } + + + case class Components( + tagsHeaderName: String, + tagsMappings: Map[String, String], + incomingEntries: Map[String, HttpPropagation.EntryReader], + outgoingEntries: Map[String, HttpPropagation.EntryWriter], + returningEntries: Map[String, HttpPropagation.EntryWriter] + ) + + object Components { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components]) + + def from(config: Config, classLoading: ClassLoading): Components = { + def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { + val entryReaders = Map.newBuilder[String, ExpectedType] + + mappings.foreach { + case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match { + case Success(readerInstance) => entryReaders += (contextKey -> readerInstance) + case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception) + } + } + + entryReaders.result() + } + + val tagsHeaderName = config.getString("tags.header-name") + val tagsMappings = config.getConfig("tags.mappings").pairs + val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs) + val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs) + val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs) + + Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala deleted file mode 100644 index 3445cc31..00000000 --- a/kamon-core/src/main/scala/kamon/context/Mixin.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import kamon.Kamon - - -/** - * Utility trait that marks objects carrying a reference to a Context instance. - * - */ -trait HasContext { - def context: Context -} - -object HasContext { - private case class Default(context: Context) extends HasContext - - /** - * Construct a HasSpan instance that references the provided Context. - * - */ - def from(context: Context): HasContext = - Default(context) - - /** - * Construct a HasContext instance with the current Kamon from Kamon's default context storage. - * - */ - def fromCurrentContext(): HasContext = - Default(Kamon.currentContext()) -} |