aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/Codecs.scala
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-08-21 09:23:07 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-08-21 10:37:08 +0200
commita152a3098b564ed43766a857b32b7c7d7445f9ce (patch)
tree7651f61e598f316ee9dca415c5a5c67ce530bad5 /kamon-core/src/main/scala/kamon/context/Codecs.scala
parent3cb974e5dfd381b9b28ffef9977047cf35242121 (diff)
downloadKamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.gz
Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.bz2
Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.zip
binary encoding of context and entries
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/Codecs.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codecs.scala245
1 files changed, 245 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala
new file mode 100644
index 00000000..b50e991d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala
@@ -0,0 +1,245 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import java.nio.ByteBuffer
+
+import com.typesafe.config.Config
+import kamon.util.DynamicAccess
+import org.slf4j.LoggerFactory
+import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
+
+import scala.collection.mutable
+
+class Codecs(initialConfig: Config) {
+ private val log = LoggerFactory.getLogger(classOf[Codecs])
+ @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty)
+ @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty)
+
+ reconfigure(initialConfig)
+
+
+ def HttpHeaders: Codecs.ForContext[TextMap] =
+ httpHeaders
+
+ def Binary: Codecs.ForContext[ByteBuffer] =
+ binary
+
+ def reconfigure(config: Config): Unit = {
+ try {
+ val codecsConfig = config.getConfig("kamon.context.codecs")
+ httpHeaders = new Codecs.HttpHeaders(readEntryCodecs("http-headers-keys", codecsConfig))
+ binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), readEntryCodecs("binary-keys", codecsConfig))
+ } catch {
+ case t: Throwable => log.error("Failed to initialize Context Codecs", t)
+ }
+ }
+
+ private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = {
+ val rootConfig = config.getConfig(rootKey)
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val entries = Map.newBuilder[String, Codecs.ForEntry[T]]
+
+ rootConfig.topLevelKeys.foreach(key => {
+ try {
+ val fqcn = rootConfig.getString(key)
+ entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get))
+ } catch {
+ case e: Throwable =>
+ log.error(s"Failed to initialize codec for key [$key]", e)
+ }
+ })
+
+ entries.result()
+ }
+}
+
+object Codecs {
+
+ trait ForContext[T] {
+ def encode(context: Context): T
+ def decode(carrier: T): Context
+ }
+
+ trait ForEntry[T] {
+ def encode(context: Context): T
+ def decode(carrier: T, context: Context): Context
+ }
+
+ final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] {
+ private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
+
+ override def encode(context: Context): TextMap = {
+ val encoded = TextMap.Default()
+
+ context.entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(codec) =>
+ try {
+ codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+ } catch {
+ case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+ }
+
+ case None =>
+ log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+ }
+
+ case _ => // All non-broadcast keys should be ignored.
+ }
+
+ encoded
+ }
+
+ override def decode(carrier: TextMap): Context = {
+ var context: Context = Context.Empty
+
+ try {
+ context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
+ val (_, codec) = codecEntry
+ codec.decode(carrier, ctx)
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from HttpHeaders", e)
+ }
+
+ context
+ }
+ }
+
+
+ final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] {
+ private val log = LoggerFactory.getLogger(classOf[Binary])
+ private val binaryBuffer = newThreadLocalBuffer(bufferSize)
+ private val emptyBuffer = ByteBuffer.allocate(0)
+
+ override def encode(context: Context): ByteBuffer = {
+ val entries = context.entries
+ if(entries.isEmpty)
+ emptyBuffer
+ else {
+ var colferEntries: List[ColferEntry] = Nil
+ entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(entryCodec) =>
+ try {
+ val entryData = entryCodec.encode(context)
+ if(entryData.capacity() > 0) {
+ val colferEntry = new ColferEntry()
+ colferEntry.setName(key.name)
+ colferEntry.setContent(entryData.array())
+ colferEntries = colferEntry :: colferEntries
+ }
+ } catch {
+ case throwable: Throwable =>
+ log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
+ }
+
+ case None =>
+ log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
+ }
+
+ case _ => // All non-broadcast keys should be ignored.
+ }
+
+ if(colferEntries.isEmpty)
+ emptyBuffer
+ else {
+ val buffer = binaryBuffer.get()
+ val colferContext = new ColferContext()
+ colferContext.setEntries(colferEntries.toArray)
+ val marshalledSize = colferContext.marshal(buffer, 0)
+
+ val data = Array.ofDim[Byte](marshalledSize)
+ System.arraycopy(buffer, 0, data, 0, marshalledSize)
+ ByteBuffer.wrap(data)
+ }
+ }
+ }
+
+
+ override def decode(carrier: ByteBuffer): Context = {
+ if(carrier.capacity() == 0)
+ Context.Empty
+ else {
+ var context: Context = Context.Empty
+
+ try {
+ val colferContext = new ColferContext()
+ colferContext.unmarshal(carrier.array(), 0)
+
+ colferContext.entries.foreach(colferEntry => {
+ entryCodecs.get(colferEntry.getName()) match {
+ case Some(entryCodec) =>
+ context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context)
+
+ case None =>
+ log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName())
+ }
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from Binary", e)
+ }
+
+ context
+ }
+ }
+
+
+ private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt)
+ }
+ }
+}
+
+
+trait TextMap {
+
+ def get(key: String): Option[String]
+
+ def put(key: String, value: String): Unit
+
+ def values: Iterator[(String, String)]
+}
+
+object TextMap {
+
+ class Default extends TextMap {
+ private val storage =
+ mutable.Map.empty[String, String]
+
+ override def get(key: String): Option[String] =
+ storage.get(key)
+
+ override def put(key: String, value: String): Unit =
+ storage.put(key, value)
+
+ override def values: Iterator[(String, String)] =
+ storage.toIterator
+ }
+
+ object Default {
+ def apply(): Default =
+ new Default()
+ }
+} \ No newline at end of file