package kamon package context import com.typesafe.config.Config import kamon.trace.IdentityProvider import kamon.util.DynamicAccess import org.slf4j.LoggerFactory import scala.collection.mutable class Codec(identityProvider: IdentityProvider, initialConfig: Config) { private val log = LoggerFactory.getLogger(classOf[Codec]) @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty) //val Binary: Codec.ForContext[ByteBuffer] = _ reconfigure(initialConfig) def HttpHeaders: Codec.ForContext[TextMap] = httpHeaders def reconfigure(config: Config): Unit = { httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config)) } private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = { val rootConfig = config.getConfig(rootKey) val dynamic = new DynamicAccess(getClass.getClassLoader) val entries = Map.newBuilder[String, Codec.ForEntry[T]] rootConfig.topLevelKeys.foreach(key => { try { val fqcn = rootConfig.getString(key) entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get)) } catch { case e: Throwable => log.error(s"Failed to initialize codec for key [$key]", e) } }) entries.result() } } object Codec { trait ForContext[T] { def encode(context: Context): T def decode(carrier: T): Context } trait ForEntry[T] { def encode(context: Context): T def decode(carrier: T, context: Context): Context } final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] { private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) override def encode(context: Context): TextMap = { val encoded = TextMap.Default() context.entries.foreach { case (key, _) if key.broadcast => entryCodecs.get(key.name) match { case Some(codec) => try { codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) } catch { case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) } case None => log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) } } encoded } override def decode(carrier: TextMap): Context = { var context: Context = Context.Empty try { context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { val (_, codec) = codecEntry codec.decode(carrier, ctx) }) } catch { case e: Throwable => log.error("Failed to decode context from HttpHeaders", e) } context } } } trait TextMap { def get(key: String): Option[String] def put(key: String, value: String): Unit def values: Iterator[(String, String)] } object TextMap { class Default extends TextMap { private val storage = mutable.Map.empty[String, String] override def get(key: String): Option[String] = storage.get(key) override def put(key: String, value: String): Unit = storage.put(key, value) override def values: Iterator[(String, String)] = storage.toIterator } object Default { def apply(): Default = new Default() } }