diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/HttpPropagation.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/HttpPropagation.scala | 204 |
1 files changed, 63 insertions, 141 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala index d53a6250..6a15e2a6 100644 --- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -1,3 +1,18 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + package kamon package context @@ -9,89 +24,31 @@ import scala.util.control.NonFatal import scala.util.{Failure, Success} /** - * Context Propagation for HTTP transports. When using HTTP transports all the context related information is - * read from and written to HTTP headers. The context information may be included in the following directions: - * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read. - * - Outgoing: Used for HTTP requests leaving this service. - * - Returning: Used for HTTP responses send back to clients of this service. + * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of + * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on + * outgoing requests to transfer a context to remote services. */ -trait HttpPropagation { - - /** - * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a - * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is - * implementation specific. - * - * @param reader Wrapper on the HTTP message from which headers are read. - * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an - * empty context is returned instead. - */ - def readContext(reader: HttpPropagation.HeaderReader): Context - - /** - * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]] - * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation - * specific. - * - * Implementations are expected to produce side effects on the wrapped HTTP Messages. - * - * @param context Context instance to be written. - * @param writer Wrapper on the HTTP message that will carry the context headers. - * @param direction Write direction. It can be either Outgoing or Returning. - */ - def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit - -} - object HttpPropagation { /** - * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait - * must be aware of the entry they are able to read and the HTTP headers required to do so. + * Wrapper that reads HTTP headers from a HTTP message. */ - trait EntryReader { - - /** - * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations - * must return an updated context instance that includes such entry. If no entry could be read simply return - * context instance that was passed in, untouched. - * - * @param reader Wrapper on the HTTP message from which headers are read. - * @param context Current context. - * @return Either the original context passed in or a modified version of it, including the read entry. - */ - def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context - } - - /** - * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait - * must be aware of the entry they are able to write and the HTTP headers required to do so. - */ - trait EntryWriter { + trait HeaderReader { /** - * Tries to write a context entry into HTTP headers. + * Reads a single HTTP header value. * - * @param context The context from which entries should be written. - * @param writer Wrapper on the HTTP message that will carry the context headers. - * @param direction Write direction. It can be either Outgoing or Returning. + * @param header HTTP header name + * @return The HTTP header value, if present. */ - def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit - } - - - /** - * Wrapper that reads HTTP headers from HTTP a message. - */ - trait HeaderReader { + def read(header: String): Option[String] /** - * Reads an HTTP header value + * Reads all HTTP headers present in the wrapped HTTP message. * - * @param header HTTP header name - * @return The HTTP header value, if present. + * @return A map from header name to */ - def readHeader(header: String): Option[String] + def readAll(): Map[String, String] } /** @@ -105,39 +62,40 @@ object HttpPropagation { * @param header HTTP header name. * @param value HTTP header value. */ - def writeHeader(header: String, value: String): Unit + def write(header: String, value: String): Unit } + /** - * Create a new default HttpPropagation instance from the provided configuration. + * Create a new default HTTP propagation instance from the provided configuration. * * @param config HTTP propagation channel configuration * @return A newly constructed HttpPropagation instance. */ - def from(config: Config, classLoading: ClassLoading): HttpPropagation = { - new HttpPropagation.Default(Components.from(config, classLoading)) + def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = { + new HttpPropagation.Default(Settings.from(config, classLoading)) } /** * Default HTTP Propagation in Kamon. */ - final class Default(components: Components) extends HttpPropagation { + class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] { private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default]) /** * Reads context tags and entries on the following order: - * - Read all context tags from the context tags header. - * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case - * of a tag key clash. - * - Read all context entries using the incoming entries configuration. + * 1. Read all context tags from the context tags header. + * 2. Read all context tags with explicit mappings. This overrides any tag + * from the previous step in case of a tag key clash. + * 3. Read all context entries using the incoming entries configuration. */ - override def readContext(reader: HeaderReader): Context = { + override def read(reader: HeaderReader): Context = { val tags = Map.newBuilder[String, String] // Tags encoded together in the context tags header. try { - reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader => + reader.read(settings.tagsHeaderName).foreach { contextTagsHeader => contextTagsHeader.split(";").foreach(tagData => { val tagPair = tagData.split("=") if (tagPair.length == 2) { @@ -150,21 +108,21 @@ object HttpPropagation { } // Tags explicitly mapped on the tags.mappings configuration. - components.tagsMappings.foreach { + settings.tagsMappings.foreach { case (tagName, httpHeader) => try { - reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) } catch { case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) } } // Incoming Entries - components.incomingEntries.foldLeft(Context.of(tags.result())) { + settings.incomingEntries.foldLeft(Context.of(tags.result())) { case (context, (entryName, entryDecoder)) => var result = context try { - result = entryDecoder.readEntry(reader, context) + result = entryDecoder.read(reader, context) } catch { case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } @@ -176,12 +134,7 @@ object HttpPropagation { /** * Writes context tags and entries */ - override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { - val keys = direction match { - case Direction.Outgoing => components.outgoingEntries - case Direction.Returning => components.returningEntries - } - + override def write(context: Context, writer: HeaderWriter): Unit = { val contextTagsHeader = new StringBuilder() def appendTag(key: String, value: String): Unit = { contextTagsHeader @@ -193,22 +146,22 @@ object HttpPropagation { // Write tags with specific mappings or append them to the context tags header. context.tags.foreach { - case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match { - case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue) + case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match { + case Some(mappedHeader) => writer.write(mappedHeader, tagValue) case None => appendTag(tagKey, tagValue) } } // Write the context tags header. if(contextTagsHeader.nonEmpty) { - writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result()) + writer.write(settings.tagsHeaderName, contextTagsHeader.result()) } // Write entries for the specified direction. - keys.foreach { + settings.outgoingEntries.foreach { case (entryName, entryWriter) => try { - entryWriter.writeEntry(context, writer, direction) + entryWriter.write(context, writer) } catch { case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } @@ -216,68 +169,37 @@ object HttpPropagation { } } - /** - * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to - * propagate context. - */ - sealed trait Direction - object Direction { - - /** - * Marker trait for all directions that require write operations. - */ - sealed trait Write - - /** - * Requests coming into this service. - */ - case object Incoming extends Direction - - /** - * Requests going from this service to others. - */ - case object Outgoing extends Direction with Write - - /** - * Responses sent from this service to clients. - */ - case object Returning extends Direction with Write - } - - - case class Components( + case class Settings( tagsHeaderName: String, tagsMappings: Map[String, String], - incomingEntries: Map[String, HttpPropagation.EntryReader], - outgoingEntries: Map[String, HttpPropagation.EntryWriter], - returningEntries: Map[String, HttpPropagation.EntryWriter] + incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]], + outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]] ) - object Components { - private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components]) + object Settings { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings]) - def from(config: Config, classLoading: ClassLoading): Components = { + def from(config: Config, classLoading: ClassLoading): Settings = { def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { - val entryReaders = Map.newBuilder[String, ExpectedType] + val instanceMap = Map.newBuilder[String, ExpectedType] mappings.foreach { - case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match { - case Success(readerInstance) => entryReaders += (contextKey -> readerInstance) + case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match { + case Success(componentInstance) => instanceMap += (contextKey -> componentInstance) case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", - implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception) + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception) } } - entryReaders.result() + instanceMap.result() } val tagsHeaderName = config.getString("tags.header-name") val tagsMappings = config.getConfig("tags.mappings").pairs - val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs) - val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs) - val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs) + val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs) + val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs) - Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries) + Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries) } } |