aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/HttpPropagation.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala204
1 files changed, 63 insertions, 141 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
index d53a6250..6a15e2a6 100644
--- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -1,3 +1,18 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
package kamon
package context
@@ -9,89 +24,31 @@ import scala.util.control.NonFatal
import scala.util.{Failure, Success}
/**
- * Context Propagation for HTTP transports. When using HTTP transports all the context related information is
- * read from and written to HTTP headers. The context information may be included in the following directions:
- * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read.
- * - Outgoing: Used for HTTP requests leaving this service.
- * - Returning: Used for HTTP responses send back to clients of this service.
+ * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of
+ * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on
+ * outgoing requests to transfer a context to remote services.
*/
-trait HttpPropagation {
-
- /**
- * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a
- * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is
- * implementation specific.
- *
- * @param reader Wrapper on the HTTP message from which headers are read.
- * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
- * empty context is returned instead.
- */
- def readContext(reader: HttpPropagation.HeaderReader): Context
-
- /**
- * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
- * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation
- * specific.
- *
- * Implementations are expected to produce side effects on the wrapped HTTP Messages.
- *
- * @param context Context instance to be written.
- * @param writer Wrapper on the HTTP message that will carry the context headers.
- * @param direction Write direction. It can be either Outgoing or Returning.
- */
- def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
-
-}
-
object HttpPropagation {
/**
- * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait
- * must be aware of the entry they are able to read and the HTTP headers required to do so.
+ * Wrapper that reads HTTP headers from a HTTP message.
*/
- trait EntryReader {
-
- /**
- * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations
- * must return an updated context instance that includes such entry. If no entry could be read simply return
- * context instance that was passed in, untouched.
- *
- * @param reader Wrapper on the HTTP message from which headers are read.
- * @param context Current context.
- * @return Either the original context passed in or a modified version of it, including the read entry.
- */
- def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context
- }
-
- /**
- * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait
- * must be aware of the entry they are able to write and the HTTP headers required to do so.
- */
- trait EntryWriter {
+ trait HeaderReader {
/**
- * Tries to write a context entry into HTTP headers.
+ * Reads a single HTTP header value.
*
- * @param context The context from which entries should be written.
- * @param writer Wrapper on the HTTP message that will carry the context headers.
- * @param direction Write direction. It can be either Outgoing or Returning.
+ * @param header HTTP header name
+ * @return The HTTP header value, if present.
*/
- def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
- }
-
-
- /**
- * Wrapper that reads HTTP headers from HTTP a message.
- */
- trait HeaderReader {
+ def read(header: String): Option[String]
/**
- * Reads an HTTP header value
+ * Reads all HTTP headers present in the wrapped HTTP message.
*
- * @param header HTTP header name
- * @return The HTTP header value, if present.
+ * @return A map from header name to
*/
- def readHeader(header: String): Option[String]
+ def readAll(): Map[String, String]
}
/**
@@ -105,39 +62,40 @@ object HttpPropagation {
* @param header HTTP header name.
* @param value HTTP header value.
*/
- def writeHeader(header: String, value: String): Unit
+ def write(header: String, value: String): Unit
}
+
/**
- * Create a new default HttpPropagation instance from the provided configuration.
+ * Create a new default HTTP propagation instance from the provided configuration.
*
* @param config HTTP propagation channel configuration
* @return A newly constructed HttpPropagation instance.
*/
- def from(config: Config, classLoading: ClassLoading): HttpPropagation = {
- new HttpPropagation.Default(Components.from(config, classLoading))
+ def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = {
+ new HttpPropagation.Default(Settings.from(config, classLoading))
}
/**
* Default HTTP Propagation in Kamon.
*/
- final class Default(components: Components) extends HttpPropagation {
+ class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] {
private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
/**
* Reads context tags and entries on the following order:
- * - Read all context tags from the context tags header.
- * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case
- * of a tag key clash.
- * - Read all context entries using the incoming entries configuration.
+ * 1. Read all context tags from the context tags header.
+ * 2. Read all context tags with explicit mappings. This overrides any tag
+ * from the previous step in case of a tag key clash.
+ * 3. Read all context entries using the incoming entries configuration.
*/
- override def readContext(reader: HeaderReader): Context = {
+ override def read(reader: HeaderReader): Context = {
val tags = Map.newBuilder[String, String]
// Tags encoded together in the context tags header.
try {
- reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader =>
+ reader.read(settings.tagsHeaderName).foreach { contextTagsHeader =>
contextTagsHeader.split(";").foreach(tagData => {
val tagPair = tagData.split("=")
if (tagPair.length == 2) {
@@ -150,21 +108,21 @@ object HttpPropagation {
}
// Tags explicitly mapped on the tags.mappings configuration.
- components.tagsMappings.foreach {
+ settings.tagsMappings.foreach {
case (tagName, httpHeader) =>
try {
- reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
} catch {
case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
}
}
// Incoming Entries
- components.incomingEntries.foldLeft(Context.of(tags.result())) {
+ settings.incomingEntries.foldLeft(Context.of(tags.result())) {
case (context, (entryName, entryDecoder)) =>
var result = context
try {
- result = entryDecoder.readEntry(reader, context)
+ result = entryDecoder.read(reader, context)
} catch {
case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
@@ -176,12 +134,7 @@ object HttpPropagation {
/**
* Writes context tags and entries
*/
- override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
- val keys = direction match {
- case Direction.Outgoing => components.outgoingEntries
- case Direction.Returning => components.returningEntries
- }
-
+ override def write(context: Context, writer: HeaderWriter): Unit = {
val contextTagsHeader = new StringBuilder()
def appendTag(key: String, value: String): Unit = {
contextTagsHeader
@@ -193,22 +146,22 @@ object HttpPropagation {
// Write tags with specific mappings or append them to the context tags header.
context.tags.foreach {
- case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
- case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue)
+ case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
case None => appendTag(tagKey, tagValue)
}
}
// Write the context tags header.
if(contextTagsHeader.nonEmpty) {
- writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result())
+ writer.write(settings.tagsHeaderName, contextTagsHeader.result())
}
// Write entries for the specified direction.
- keys.foreach {
+ settings.outgoingEntries.foreach {
case (entryName, entryWriter) =>
try {
- entryWriter.writeEntry(context, writer, direction)
+ entryWriter.write(context, writer)
} catch {
case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
@@ -216,68 +169,37 @@ object HttpPropagation {
}
}
- /**
- * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to
- * propagate context.
- */
- sealed trait Direction
- object Direction {
-
- /**
- * Marker trait for all directions that require write operations.
- */
- sealed trait Write
-
- /**
- * Requests coming into this service.
- */
- case object Incoming extends Direction
-
- /**
- * Requests going from this service to others.
- */
- case object Outgoing extends Direction with Write
-
- /**
- * Responses sent from this service to clients.
- */
- case object Returning extends Direction with Write
- }
-
-
- case class Components(
+ case class Settings(
tagsHeaderName: String,
tagsMappings: Map[String, String],
- incomingEntries: Map[String, HttpPropagation.EntryReader],
- outgoingEntries: Map[String, HttpPropagation.EntryWriter],
- returningEntries: Map[String, HttpPropagation.EntryWriter]
+ incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]]
)
- object Components {
- private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components])
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings])
- def from(config: Config, classLoading: ClassLoading): Components = {
+ def from(config: Config, classLoading: ClassLoading): Settings = {
def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
- val entryReaders = Map.newBuilder[String, ExpectedType]
+ val instanceMap = Map.newBuilder[String, ExpectedType]
mappings.foreach {
- case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match {
- case Success(readerInstance) => entryReaders += (contextKey -> readerInstance)
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
- implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception)
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
}
}
- entryReaders.result()
+ instanceMap.result()
}
val tagsHeaderName = config.getString("tags.header-name")
val tagsMappings = config.getConfig("tags.mappings").pairs
- val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs)
- val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs)
- val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs)
+ val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs)
+ val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs)
- Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries)
+ Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries)
}
}