aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/Codec.scala
blob: 50b7e93dc388b5c79006ef4fd6313cffc43dfdb8 (plain) (tree)
1
2
3
4
5
6
7
8
9
10



                                 
                                   

                               
 

                               
                                                                        











                                                                                                       









































































                                                                                                                     



               
 









                                           

                                       
 

                                                   
 

                                                        
 

                                                     


                  

                          
   
 
package kamon
package context

import com.typesafe.config.Config
import kamon.trace.IdentityProvider
import kamon.util.DynamicAccess
import org.slf4j.LoggerFactory

import scala.collection.mutable

class Codec(identityProvider: IdentityProvider, initialConfig: Config) {
  private val log = LoggerFactory.getLogger(classOf[Codec])

  @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
  //val Binary: Codec.ForContext[ByteBuffer] = _
  reconfigure(initialConfig)


  def HttpHeaders: Codec.ForContext[TextMap] =
    httpHeaders

  def reconfigure(config: Config): Unit = {
    httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
  }

  private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
    val rootConfig = config.getConfig(rootKey)
    val dynamic = new DynamicAccess(getClass.getClassLoader)
    val entries = Map.newBuilder[String, Codec.ForEntry[T]]

    rootConfig.topLevelKeys.foreach(key => {
      try {
        val fqcn = rootConfig.getString(key)
        entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
      } catch {
        case e: Throwable =>
          log.error(s"Failed to initialize codec for key [$key]", e)
      }
    })

    entries.result()
  }
}

object Codec {

  trait ForContext[T] {
    def encode(context: Context): T
    def decode(carrier: T): Context
  }

  trait ForEntry[T] {
    def encode(context: Context): T
    def decode(carrier: T, context: Context): Context
  }

  final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
    private val log = LoggerFactory.getLogger(classOf[HttpHeaders])

    override def encode(context: Context): TextMap = {
      val encoded = TextMap.Default()

      context.entries.foreach {
        case (key, _) if key.broadcast =>
          entryCodecs.get(key.name) match {
            case Some(codec) =>
              try {
                codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
              } catch {
                case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
              }

            case None =>
              log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
          }
      }

      encoded
    }

    override def decode(carrier: TextMap): Context = {
      var context: Context = Context.Empty

      try {
        context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
          val (_, codec) = codecEntry
          codec.decode(carrier, ctx)
        })

      } catch {
        case e: Throwable =>
          log.error("Failed to decode context from HttpHeaders", e)
      }

      context
    }
  }
}


trait TextMap {

  def get(key: String): Option[String]

  def put(key: String, value: String): Unit

  def values: Iterator[(String, String)]
}

object TextMap {

  class Default extends TextMap {
    private val storage =
      mutable.Map.empty[String, String]

    override def get(key: String): Option[String] =
      storage.get(key)

    override def put(key: String, value: String): Unit =
      storage.put(key, value)

    override def values: Iterator[(String, String)] =
      storage.toIterator
  }

  object Default {
    def apply(): Default =
      new Default()
  }
}