aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/Codec.scala
blob: 50b7e93dc388b5c79006ef4fd6313cffc43dfdb8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package kamon
package context

import com.typesafe.config.Config
import kamon.trace.IdentityProvider
import kamon.util.DynamicAccess
import org.slf4j.LoggerFactory

import scala.collection.mutable

class Codec(identityProvider: IdentityProvider, initialConfig: Config) {
  private val log = LoggerFactory.getLogger(classOf[Codec])

  @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
  //val Binary: Codec.ForContext[ByteBuffer] = _
  reconfigure(initialConfig)


  def HttpHeaders: Codec.ForContext[TextMap] =
    httpHeaders

  def reconfigure(config: Config): Unit = {
    httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
  }

  private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
    val rootConfig = config.getConfig(rootKey)
    val dynamic = new DynamicAccess(getClass.getClassLoader)
    val entries = Map.newBuilder[String, Codec.ForEntry[T]]

    rootConfig.topLevelKeys.foreach(key => {
      try {
        val fqcn = rootConfig.getString(key)
        entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
      } catch {
        case e: Throwable =>
          log.error(s"Failed to initialize codec for key [$key]", e)
      }
    })

    entries.result()
  }
}

object Codec {

  trait ForContext[T] {
    def encode(context: Context): T
    def decode(carrier: T): Context
  }

  trait ForEntry[T] {
    def encode(context: Context): T
    def decode(carrier: T, context: Context): Context
  }

  final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
    private val log = LoggerFactory.getLogger(classOf[HttpHeaders])

    override def encode(context: Context): TextMap = {
      val encoded = TextMap.Default()

      context.entries.foreach {
        case (key, _) if key.broadcast =>
          entryCodecs.get(key.name) match {
            case Some(codec) =>
              try {
                codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
              } catch {
                case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
              }

            case None =>
              log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
          }
      }

      encoded
    }

    override def decode(carrier: TextMap): Context = {
      var context: Context = Context.Empty

      try {
        context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
          val (_, codec) = codecEntry
          codec.decode(carrier, ctx)
        })

      } catch {
        case e: Throwable =>
          log.error("Failed to decode context from HttpHeaders", e)
      }

      context
    }
  }
}


trait TextMap {

  def get(key: String): Option[String]

  def put(key: String, value: String): Unit

  def values: Iterator[(String, String)]
}

object TextMap {

  class Default extends TextMap {
    private val storage =
      mutable.Map.empty[String, String]

    override def get(key: String): Option[String] =
      storage.get(key)

    override def put(key: String, value: String): Unit =
      storage.put(key, value)

    override def values: Iterator[(String, String)] =
      storage.toIterator
  }

  object Default {
    def apply(): Default =
      new Default()
  }
}