aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/Codec.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/Codec.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codec.scala130
1 files changed, 130 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala
new file mode 100644
index 00000000..50b7e93d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Codec.scala
@@ -0,0 +1,130 @@
+package kamon
+package context
+
+import com.typesafe.config.Config
+import kamon.trace.IdentityProvider
+import kamon.util.DynamicAccess
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable
+
+class Codec(identityProvider: IdentityProvider, initialConfig: Config) {
+ private val log = LoggerFactory.getLogger(classOf[Codec])
+
+ @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
+ //val Binary: Codec.ForContext[ByteBuffer] = _
+ reconfigure(initialConfig)
+
+
+ def HttpHeaders: Codec.ForContext[TextMap] =
+ httpHeaders
+
+ def reconfigure(config: Config): Unit = {
+ httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
+ }
+
+ private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
+ val rootConfig = config.getConfig(rootKey)
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val entries = Map.newBuilder[String, Codec.ForEntry[T]]
+
+ rootConfig.topLevelKeys.foreach(key => {
+ try {
+ val fqcn = rootConfig.getString(key)
+ entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
+ } catch {
+ case e: Throwable =>
+ log.error(s"Failed to initialize codec for key [$key]", e)
+ }
+ })
+
+ entries.result()
+ }
+}
+
+object Codec {
+
+ trait ForContext[T] {
+ def encode(context: Context): T
+ def decode(carrier: T): Context
+ }
+
+ trait ForEntry[T] {
+ def encode(context: Context): T
+ def decode(carrier: T, context: Context): Context
+ }
+
+ final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
+ private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
+
+ override def encode(context: Context): TextMap = {
+ val encoded = TextMap.Default()
+
+ context.entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(codec) =>
+ try {
+ codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+ } catch {
+ case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+ }
+
+ case None =>
+ log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+ }
+ }
+
+ encoded
+ }
+
+ override def decode(carrier: TextMap): Context = {
+ var context: Context = Context.Empty
+
+ try {
+ context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
+ val (_, codec) = codecEntry
+ codec.decode(carrier, ctx)
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from HttpHeaders", e)
+ }
+
+ context
+ }
+ }
+}
+
+
+trait TextMap {
+
+ def get(key: String): Option[String]
+
+ def put(key: String, value: String): Unit
+
+ def values: Iterator[(String, String)]
+}
+
+object TextMap {
+
+ class Default extends TextMap {
+ private val storage =
+ mutable.Map.empty[String, String]
+
+ override def get(key: String): Option[String] =
+ storage.get(key)
+
+ override def put(key: String, value: String): Unit =
+ storage.put(key, value)
+
+ override def values: Iterator[(String, String)] =
+ storage.toIterator
+ }
+
+ object Default {
+ def apply(): Default =
+ new Default()
+ }
+} \ No newline at end of file