diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/Codecs.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Codecs.scala | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala new file mode 100644 index 00000000..b50e991d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala @@ -0,0 +1,245 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import java.nio.ByteBuffer + +import com.typesafe.config.Config +import kamon.util.DynamicAccess +import org.slf4j.LoggerFactory +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} + +import scala.collection.mutable + +class Codecs(initialConfig: Config) { + private val log = LoggerFactory.getLogger(classOf[Codecs]) + @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty) + @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty) + + reconfigure(initialConfig) + + + def HttpHeaders: Codecs.ForContext[TextMap] = + httpHeaders + + def Binary: Codecs.ForContext[ByteBuffer] = + binary + + def reconfigure(config: Config): Unit = { + try { + val codecsConfig = config.getConfig("kamon.context.codecs") + httpHeaders = new Codecs.HttpHeaders(readEntryCodecs("http-headers-keys", codecsConfig)) + binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), readEntryCodecs("binary-keys", codecsConfig)) + } catch { + case t: Throwable => log.error("Failed to initialize Context Codecs", t) + } + } + + private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = { + val rootConfig = config.getConfig(rootKey) + val dynamic = new DynamicAccess(getClass.getClassLoader) + val entries = Map.newBuilder[String, Codecs.ForEntry[T]] + + rootConfig.topLevelKeys.foreach(key => { + try { + val fqcn = rootConfig.getString(key) + entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get)) + } catch { + case e: Throwable => + log.error(s"Failed to initialize codec for key [$key]", e) + } + }) + + entries.result() + } +} + +object Codecs { + + trait ForContext[T] { + def encode(context: Context): T + def decode(carrier: T): Context + } + + trait ForEntry[T] { + def encode(context: Context): T + def decode(carrier: T, context: Context): Context + } + + final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] { + private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) + + override def encode(context: Context): TextMap = { + val encoded = TextMap.Default() + + context.entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(codec) => + try { + codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) + } catch { + case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) + } + + case None => + log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + encoded + } + + override def decode(carrier: TextMap): Context = { + var context: Context = Context.Empty + + try { + context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { + val (_, codec) = codecEntry + codec.decode(carrier, ctx) + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from HttpHeaders", e) + } + + context + } + } + + + final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] { + private val log = LoggerFactory.getLogger(classOf[Binary]) + private val binaryBuffer = newThreadLocalBuffer(bufferSize) + private val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val entries = context.entries + if(entries.isEmpty) + emptyBuffer + else { + var colferEntries: List[ColferEntry] = Nil + entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(entryCodec) => + try { + val entryData = entryCodec.encode(context) + if(entryData.capacity() > 0) { + val colferEntry = new ColferEntry() + colferEntry.setName(key.name) + colferEntry.setContent(entryData.array()) + colferEntries = colferEntry :: colferEntries + } + } catch { + case throwable: Throwable => + log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) + } + + case None => + log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + if(colferEntries.isEmpty) + emptyBuffer + else { + val buffer = binaryBuffer.get() + val colferContext = new ColferContext() + colferContext.setEntries(colferEntries.toArray) + val marshalledSize = colferContext.marshal(buffer, 0) + + val data = Array.ofDim[Byte](marshalledSize) + System.arraycopy(buffer, 0, data, 0, marshalledSize) + ByteBuffer.wrap(data) + } + } + } + + + override def decode(carrier: ByteBuffer): Context = { + if(carrier.capacity() == 0) + Context.Empty + else { + var context: Context = Context.Empty + + try { + val colferContext = new ColferContext() + colferContext.unmarshal(carrier.array(), 0) + + colferContext.entries.foreach(colferEntry => { + entryCodecs.get(colferEntry.getName()) match { + case Some(entryCodec) => + context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context) + + case None => + log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName()) + } + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from Binary", e) + } + + context + } + } + + + private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt) + } + } +} + + +trait TextMap { + + def get(key: String): Option[String] + + def put(key: String, value: String): Unit + + def values: Iterator[(String, String)] +} + +object TextMap { + + class Default extends TextMap { + private val storage = + mutable.Map.empty[String, String] + + override def get(key: String): Option[String] = + storage.get(key) + + override def put(key: String, value: String): Unit = + storage.put(key, value) + + override def values: Iterator[(String, String)] = + storage.toIterator + } + + object Default { + def apply(): Default = + new Default() + } +}
\ No newline at end of file |