aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
blob: d53a62509a1ed1521057370c8565bcf3a6463978 (plain) (tree)
1
2
3
4
5
6
7
8






                                 
                                  



















                                                                                                                   
                                                                











                                                                                                                     
                                                                                                                            



















                                                                                                                 
                                                                                  














                                                                                                                
                                                                                                            













                                                          
                                                  












                                                         
                                                        

























                                                                                                               
                                                               



                                                          
                                                                                   







                                                           
                                                                                                   





                                                                   
                                                                                            
                   
                                                                                                        







                                                                      
                                                            
                   
                                                                                                                       








                                       
                                                                                                           
















                                                                                     
                                                                               





                                                  
                                                                                 





                                                   
                                                              
                   
                                                                                                                                   






































































                                                                                                                     
package kamon
package context

import com.typesafe.config.Config
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag
import scala.util.control.NonFatal
import scala.util.{Failure, Success}

/**
  * Context Propagation for HTTP transports. When using HTTP transports all the context related information is
  * read from and written to HTTP headers. The context information may be included in the following directions:
  *   - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read.
  *   - Outgoing: Used for HTTP requests leaving this service.
  *   - Returning: Used for HTTP responses send back to clients of this service.
  */
trait HttpPropagation {

  /**
    * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a
    * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is
    * implementation specific.
    *
    * @param reader Wrapper on the HTTP message from which headers are read.
    * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
    *          empty context is returned instead.
    */
  def readContext(reader: HttpPropagation.HeaderReader): Context

  /**
    * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
    * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation
    * specific.
    *
    * Implementations are expected to produce side effects on the wrapped HTTP Messages.
    *
    * @param context Context instance to be written.
    * @param writer Wrapper on the HTTP message that will carry the context headers.
    * @param direction Write direction. It can be either Outgoing or Returning.
    */
  def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit

}

object HttpPropagation {

  /**
    * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait
    * must be aware of the entry they are able to read and the HTTP headers required to do so.
    */
  trait EntryReader {

    /**
      * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations
      * must return an updated context instance that includes such entry. If no entry could be read simply return
      * context instance that was passed in, untouched.
      *
      * @param reader Wrapper on the HTTP message from which headers are read.
      * @param context Current context.
      * @return Either the original context passed in or a modified version of it, including the read entry.
      */
    def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context
  }

  /**
    * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait
    * must be aware of the entry they are able to write and the HTTP headers required to do so.
    */
  trait EntryWriter {

    /**
      * Tries to write a context entry into HTTP headers.
      *
      * @param context The context from which entries should be written.
      * @param writer Wrapper on the HTTP message that will carry the context headers.
      * @param direction Write direction. It can be either Outgoing or Returning.
      */
    def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
  }


  /**
    * Wrapper that reads HTTP headers from HTTP a message.
    */
  trait HeaderReader {

    /**
      * Reads an HTTP header value
      *
      * @param header HTTP header name
      * @return The HTTP header value, if present.
      */
    def readHeader(header: String): Option[String]
  }

  /**
    * Wrapper that writes HTTP headers to a HTTP message.
    */
  trait HeaderWriter {

    /**
      * Writes a HTTP header into a HTTP message.
      *
      * @param header HTTP header name.
      * @param value HTTP header value.
      */
    def writeHeader(header: String, value: String): Unit
  }


  /**
    * Create a new default HttpPropagation instance from the provided configuration.
    *
    * @param config HTTP propagation channel configuration
    * @return A newly constructed HttpPropagation instance.
    */
  def from(config: Config, classLoading: ClassLoading): HttpPropagation = {
    new HttpPropagation.Default(Components.from(config, classLoading))
  }

  /**
    * Default HTTP Propagation in Kamon.
    */
  final class Default(components: Components) extends HttpPropagation {
    private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])

    /**
      * Reads context tags and entries on the following order:
      *   - Read all context tags from the context tags header.
      *   - Read all context tags with explicit mappings. This overrides any tag from the previous step in case
      *     of a tag key clash.
      *   - Read all context entries using the incoming entries configuration.
      */
    override def readContext(reader: HeaderReader): Context = {
      val tags = Map.newBuilder[String, String]

      // Tags encoded together in the context tags header.
      try {
        reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader =>
          contextTagsHeader.split(";").foreach(tagData => {
            val tagPair = tagData.split("=")
            if (tagPair.length == 2) {
              tags += (tagPair(0) -> tagPair(1))
            }
          })
        }
      } catch {
        case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
      }

      // Tags explicitly mapped on the tags.mappings configuration.
      components.tagsMappings.foreach {
        case (tagName, httpHeader) =>
          try {
            reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
          } catch {
            case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
          }
      }

      // Incoming Entries
      components.incomingEntries.foldLeft(Context.of(tags.result())) {
        case (context, (entryName, entryDecoder)) =>
          var result = context
          try {
            result = entryDecoder.readEntry(reader, context)
          } catch {
            case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
          }

          result
      }
    }

    /**
      * Writes context tags and entries
      */
    override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
      val keys = direction match {
        case Direction.Outgoing => components.outgoingEntries
        case Direction.Returning => components.returningEntries
      }

      val contextTagsHeader = new StringBuilder()
      def appendTag(key: String, value: String): Unit = {
        contextTagsHeader
          .append(key)
          .append('=')
          .append(value)
          .append(';')
      }

      // Write tags with specific mappings or append them to the context tags header.
      context.tags.foreach {
        case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
          case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue)
          case None => appendTag(tagKey, tagValue)
        }
      }

      // Write the context tags header.
      if(contextTagsHeader.nonEmpty) {
        writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result())
      }

      // Write entries for the specified direction.
      keys.foreach {
        case (entryName, entryWriter) =>
          try {
            entryWriter.writeEntry(context, writer, direction)
          } catch {
            case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
          }
      }
    }
  }

  /**
    * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to
    * propagate context.
    */
  sealed trait Direction
  object Direction {

    /**
      * Marker trait for all directions that require write operations.
      */
    sealed trait Write

    /**
      * Requests coming into this service.
      */
    case object Incoming extends Direction

    /**
      * Requests going from this service to others.
      */
    case object Outgoing extends Direction with Write

    /**
      * Responses sent from this service to clients.
      */
    case object Returning extends Direction with Write
  }


  case class Components(
    tagsHeaderName: String,
    tagsMappings: Map[String, String],
    incomingEntries: Map[String, HttpPropagation.EntryReader],
    outgoingEntries: Map[String, HttpPropagation.EntryWriter],
    returningEntries: Map[String, HttpPropagation.EntryWriter]
  )

  object Components {
    private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components])

    def from(config: Config, classLoading: ClassLoading): Components = {
      def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
        val entryReaders = Map.newBuilder[String, ExpectedType]

        mappings.foreach {
          case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match {
            case Success(readerInstance) => entryReaders += (contextKey -> readerInstance)
            case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
              implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception)
          }
        }

        entryReaders.result()
      }

      val tagsHeaderName = config.getString("tags.header-name")
      val tagsMappings = config.getConfig("tags.mappings").pairs
      val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs)
      val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs)
      val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs)

      Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries)
    }
  }

}