1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
package kamon
package context
import com.typesafe.config.Config
import kamon.trace.IdentityProvider
import kamon.util.DynamicAccess
import org.slf4j.LoggerFactory
import scala.collection.mutable
class Codec(initialConfig: Config) {
private val log = LoggerFactory.getLogger(classOf[Codec])
@volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
//val Binary: Codec.ForContext[ByteBuffer] = _
reconfigure(initialConfig)
def HttpHeaders: Codec.ForContext[TextMap] =
httpHeaders
def reconfigure(config: Config): Unit = {
httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
}
private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
val rootConfig = config.getConfig(rootKey)
val dynamic = new DynamicAccess(getClass.getClassLoader)
val entries = Map.newBuilder[String, Codec.ForEntry[T]]
rootConfig.topLevelKeys.foreach(key => {
try {
val fqcn = rootConfig.getString(key)
entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
} catch {
case e: Throwable =>
log.error(s"Failed to initialize codec for key [$key]", e)
}
})
entries.result()
}
}
object Codec {
trait ForContext[T] {
def encode(context: Context): T
def decode(carrier: T): Context
}
trait ForEntry[T] {
def encode(context: Context): T
def decode(carrier: T, context: Context): Context
}
final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
override def encode(context: Context): TextMap = {
val encoded = TextMap.Default()
context.entries.foreach {
case (key, _) if key.broadcast =>
entryCodecs.get(key.name) match {
case Some(codec) =>
try {
codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
} catch {
case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
}
case None =>
log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
}
}
encoded
}
override def decode(carrier: TextMap): Context = {
var context: Context = Context.Empty
try {
context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
val (_, codec) = codecEntry
codec.decode(carrier, ctx)
})
} catch {
case e: Throwable =>
log.error("Failed to decode context from HttpHeaders", e)
}
context
}
}
}
trait TextMap {
def get(key: String): Option[String]
def put(key: String, value: String): Unit
def values: Iterator[(String, String)]
}
object TextMap {
class Default extends TextMap {
private val storage =
mutable.Map.empty[String, String]
override def get(key: String): Option[String] =
storage.get(key)
override def put(key: String, value: String): Unit =
storage.put(key, value)
override def values: Iterator[(String, String)] =
storage.toIterator
}
object Default {
def apply(): Default =
new Default()
}
}
|