package kamon
package context
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
/**
* Context Propagation for HTTP transports. When using HTTP transports all the context related information is
* read from and written to HTTP headers. The context information may be included in the following directions:
* - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read.
* - Outgoing: Used for HTTP requests leaving this service.
* - Returning: Used for HTTP responses send back to clients of this service.
*/
trait HttpPropagation {
/**
* Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a
* [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is
* implementation specific.
*
* @param reader Wrapper on the HTTP message from which headers are read.
* @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
* empty context is returned instead.
*/
def readContext(reader: HttpPropagation.HeaderReader): Context
/**
* Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
* instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation
* specific.
*
* Implementations are expected to produce side effects on the wrapped HTTP Messages.
*
* @param context Context instance to be written.
* @param writer Wrapper on the HTTP message that will carry the context headers.
* @param direction Write direction. It can be either Outgoing or Returning.
*/
def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
}
object HttpPropagation {
/**
* Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait
* must be aware of the entry they are able to read and the HTTP headers required to do so.
*/
trait EntryReader {
/**
* Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations
* must return an updated context instance that includes such entry. If no entry could be read simply return
* context instance that was passed in, untouched.
*
* @param reader Wrapper on the HTTP message from which headers are read.
* @param context Current context.
* @return Either the original context passed in or a modified version of it, including the read entry.
*/
def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context
}
/**
* Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait
* must be aware of the entry they are able to write and the HTTP headers required to do so.
*/
trait EntryWriter {
/**
* Tries to write a context entry into HTTP headers.
*
* @param context The context from which entries should be written.
* @param writer Wrapper on the HTTP message that will carry the context headers.
* @param direction Write direction. It can be either Outgoing or Returning.
*/
def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
}
/**
* Wrapper that reads HTTP headers from HTTP a message.
*/
trait HeaderReader {
/**
* Reads an HTTP header value
*
* @param header HTTP header name
* @return The HTTP header value, if present.
*/
def readHeader(header: String): Option[String]
}
/**
* Wrapper that writes HTTP headers to a HTTP message.
*/
trait HeaderWriter {
/**
* Writes a HTTP header into a HTTP message.
*
* @param header HTTP header name.
* @param value HTTP header value.
*/
def writeHeader(header: String, value: String): Unit
}
/**
* Create a new default HttpPropagation instance from the provided configuration.
*
* @param config HTTP propagation channel configuration
* @return A newly constructed HttpPropagation instance.
*/
def from(config: Config, classLoading: ClassLoading): HttpPropagation = {
new HttpPropagation.Default(Components.from(config, classLoading))
}
/**
* Default HTTP Propagation in Kamon.
*/
final class Default(components: Components) extends HttpPropagation {
private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
/**
* Reads context tags and entries on the following order:
* - Read all context tags from the context tags header.
* - Read all context tags with explicit mappings. This overrides any tag from the previous step in case
* of a tag key clash.
* - Read all context entries using the incoming entries configuration.
*/
override def readContext(reader: HeaderReader): Context = {
val tags = Map.newBuilder[String, String]
// Tags encoded together in the context tags header.
try {
reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader =>
contextTagsHeader.split(";").foreach(tagData => {
val tagPair = tagData.split("=")
if (tagPair.length == 2) {
tags += (tagPair(0) -> tagPair(1))
}
})
}
} catch {
case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
}
// Tags explicitly mapped on the tags.mappings configuration.
components.tagsMappings.foreach {
case (tagName, httpHeader) =>
try {
reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
} catch {
case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
}
}
// Incoming Entries
components.incomingEntries.foldLeft(Context.of(tags.result())) {
case (context, (entryName, entryDecoder)) =>
var result = context
try {
result = entryDecoder.readEntry(reader, context)
} catch {
case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
result
}
}
/**
* Writes context tags and entries
*/
override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
val keys = direction match {
case Direction.Outgoing => components.outgoingEntries
case Direction.Returning => components.returningEntries
}
val contextTagsHeader = new StringBuilder()
def appendTag(key: String, value: String): Unit = {
contextTagsHeader
.append(key)
.append('=')
.append(value)
.append(';')
}
// Write tags with specific mappings or append them to the context tags header.
context.tags.foreach {
case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue)
case None => appendTag(tagKey, tagValue)
}
}
// Write the context tags header.
if(contextTagsHeader.nonEmpty) {
writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result())
}
// Write entries for the specified direction.
keys.foreach {
case (entryName, entryWriter) =>
try {
entryWriter.writeEntry(context, writer, direction)
} catch {
case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
}
}
}
/**
* Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to
* propagate context.
*/
sealed trait Direction
object Direction {
/**
* Marker trait for all directions that require write operations.
*/
sealed trait Write
/**
* Requests coming into this service.
*/
case object Incoming extends Direction
/**
* Requests going from this service to others.
*/
case object Outgoing extends Direction with Write
/**
* Responses sent from this service to clients.
*/
case object Returning extends Direction with Write
}
case class Components(
tagsHeaderName: String,
tagsMappings: Map[String, String],
incomingEntries: Map[String, HttpPropagation.EntryReader],
outgoingEntries: Map[String, HttpPropagation.EntryWriter],
returningEntries: Map[String, HttpPropagation.EntryWriter]
)
object Components {
private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components])
def from(config: Config, classLoading: ClassLoading): Components = {
def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
val entryReaders = Map.newBuilder[String, ExpectedType]
mappings.foreach {
case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match {
case Success(readerInstance) => entryReaders += (contextKey -> readerInstance)
case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception)
}
}
entryReaders.result()
}
val tagsHeaderName = config.getString("tags.header-name")
val tagsMappings = config.getConfig("tags.mappings").pairs
val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs)
val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs)
val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs)
Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries)
}
}
}