/* =========================================================================================
* Copyright © 2013-2018 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/
package kamon
package context
import com.typesafe.config.Config
import kamon.tag.{Tag, TagSet}
import org.slf4j.LoggerFactory
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
/**
* Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of
* HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on
* outgoing requests to transfer a context to remote services.
*/
object HttpPropagation {
/**
* Wrapper that reads HTTP headers from a HTTP message.
*/
trait HeaderReader {
/**
* Reads a single HTTP header value.
*
* @param header HTTP header name
* @return The HTTP header value, if present.
*/
def read(header: String): Option[String]
/**
* Reads all HTTP headers present in the wrapped HTTP message.
*
* @return A map from header name to
*/
def readAll(): Map[String, String]
}
/**
* Wrapper that writes HTTP headers to a HTTP message.
*/
trait HeaderWriter {
/**
* Writes a HTTP header into a HTTP message.
*
* @param header HTTP header name.
* @param value HTTP header value.
*/
def write(header: String, value: String): Unit
}
/**
* Create a new default HTTP propagation instance from the provided configuration.
*
* @param config HTTP propagation channel configuration
* @return A newly constructed HttpPropagation instance.
*/
def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = {
new HttpPropagation.Default(Settings.from(config, classLoading))
}
/**
* Default HTTP Propagation in Kamon.
*/
class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] {
private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
/**
* Reads context tags and entries on the following order:
* 1. Read all context tags from the context tags header.
* 2. Read all context tags with explicit mappings. This overrides any tag
* from the previous step in case of a tag key clash.
* 3. Read all context entries using the incoming entries configuration.
*/
override def read(reader: HeaderReader): Context = {
val tags = Map.newBuilder[String, Any]
// Tags encoded together in the context tags header.
try {
reader.read(settings.tagsHeaderName).foreach { contextTagsHeader =>
contextTagsHeader.split(";").foreach(tagData => {
val tagPair = tagData.split("=")
if (tagPair.length == 2) {
tags += (tagPair(0) -> parseTagValue(tagPair(1)))
}
})
}
} catch {
case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
}
// Tags explicitly mapped on the tags.mappings configuration.
settings.tagsMappings.foreach {
case (tagName, httpHeader) =>
try {
reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
} catch {
case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
}
}
// Incoming Entries
settings.incomingEntries.foldLeft(Context.of(TagSet.from(tags.result()))) {
case (context, (entryName, entryDecoder)) =>
var result = context
try {
result = entryDecoder.read(reader, context)
} catch {
case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
result
}
}
/**
* Writes context tags and entries
*/
override def write(context: Context, writer: HeaderWriter): Unit = {
val contextTagsHeader = new StringBuilder()
def appendTag(key: String, value: String): Unit = {
contextTagsHeader
.append(key)
.append('=')
.append(value)
.append(';')
}
// Write tags with specific mappings or append them to the context tags header.
context.tags.iterator().foreach { tag =>
val tagKey = tag.key
settings.tagsMappings.get(tagKey) match {
case Some(mappedHeader) => writer.write(mappedHeader, tagValueWithPrefix(tag))
case None => appendTag(tagKey, Tag.unwrapValue(tag).toString)
}
}
// Write the context tags header.
if(contextTagsHeader.nonEmpty) {
writer.write(settings.tagsHeaderName, contextTagsHeader.result())
}
// Write entries for the specified direction.
settings.outgoingEntries.foreach {
case (entryName, entryWriter) =>
try {
entryWriter.write(context, writer)
} catch {
case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
}
}
private val _longTypePrefix = "l:"
private val _booleanTypePrefix = "b:"
private val _booleanTrue = "true"
private val _booleanFalse = "false"
/**
* Tries to infer and parse a value into one of the supported tag types: String, Long or Boolean by looking for the
* type indicator prefix on the tag value. If the inference fails it will default to treat the value as a String.
*/
private def parseTagValue(value: String): Any = {
if (value.isEmpty || value.length < 2) // Empty and short values definitely do not have type indicators.
value
else {
if(value.startsWith(_longTypePrefix)) {
// Try to parse the content as a Long value.
val remaining = value.substring(2)
try {
java.lang.Long.parseLong(remaining)
} catch {
case _: Throwable => remaining
}
} else if(value.startsWith(_booleanTypePrefix)) {
// Try to parse the content as a Boolean value.
val remaining = value.substring(2)
if(remaining.equals(_booleanTrue))
true
else if(remaining.equals(_booleanFalse))
false
else
remaining
} else value
}
}
/**
* Returns the actual value to be written in the HTTP transport, with a type prefix if applicable.
*/
private def tagValueWithPrefix(tag: Tag): String = tag match {
case t: Tag.String => t.value
case t: Tag.Boolean => _booleanTypePrefix + t.value.toString
case t: Tag.Long => _longTypePrefix + t.value.toString
}
}
case class Settings(
tagsHeaderName: String,
tagsMappings: Map[String, String],
incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]],
outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]]
)
object Settings {
private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings])
def from(config: Config, classLoading: ClassLoading): Settings = {
def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
val instanceMap = Map.newBuilder[String, ExpectedType]
mappings.foreach {
case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
}
}
instanceMap.result()
}
val tagsHeaderName = config.getString("tags.header-name")
val tagsMappings = config.getConfig("tags.mappings").pairs
val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs)
val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs)
Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries)
}
}
}