diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
6 files changed, 643 insertions, 397 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala new file mode 100644 index 00000000..75e65c44 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala @@ -0,0 +1,301 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} + +import com.typesafe.config.Config +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag +import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} + + +/** + * Context propagation that uses byte stream abstractions as the transport medium. The Binary propagation uses + * instances of [[ByteStreamReader]] and [[ByteStreamWriter]] to decode and encode Context instances, respectively. + * + * Binary propagation uses the [[ByteStreamReader]] and [[ByteStreamWriter]] abstraction which closely model the APIs + * from [[InputStream]] and [[OutputStream]], but without exposing additional functionality that wouldn't have any + * well defined behavior for Context propagation, e.g. flush or close functions on OutputStreams. + */ +object BinaryPropagation { + + /** + * Represents a readable stream of bytes. This interface closely resembles [[InputStream]], minus the functionality + * that wouldn't have a clearly defined behavior in the context of Context propagation. + */ + trait ByteStreamReader { + /** + * Number of available bytes on the ByteStream. + */ + def available(): Int + + /** + * Reads as many bytes as possible into the target byte array. + * + * @param target Target buffer in which the read bytes will be written. + * @return The number of bytes written into the target buffer. + */ + def read(target: Array[Byte]): Int + + /** + * Reads a specified number of bytes into the target buffer, starting from the offset position. + * + * @param target Target buffer in which read bytes will be written. + * @param offset Offset index in which to start writing bytes on the target buffer. + * @param count Number of bytes to be read. + * @return The number of bytes written into the target buffer. + */ + def read(target: Array[Byte], offset: Int, count: Int): Int + + /** + * Reads all available bytes into a newly created byte array. + * + * @return All bytes read. + */ + def readAll(): Array[Byte] + } + + + object ByteStreamReader { + + /** + * Creates a new [[ByteStreamReader]] that reads data from a byte array. + */ + def of(bytes: Array[Byte]): ByteStreamReader = new ByteArrayInputStream(bytes) with ByteStreamReader { + override def readAll(): Array[Byte] = { + val target = Array.ofDim[Byte](available()) + read(target, 0, available()) + target + } + } + } + + + /** + * Represents a writable stream of bytes. This interface closely resembles [[OutputStream]], minus the functionality + * that wouldn't have a clearly defined behavior in the context of Context propagation. + */ + trait ByteStreamWriter { + + /** + * Writes all bytes into the stream. + */ + def write(bytes: Array[Byte]): Unit + + /** + * Writes a portion of the provided bytes into the stream. + * + * @param bytes Buffer from which data will be selected. + * @param offset Starting index on the buffer. + * @param count Number of bytes to write into the stream. + */ + def write(bytes: Array[Byte], offset: Int, count: Int): Unit + + /** + * Write a single byte into the stream. + */ + def write(byte: Int): Unit + + } + + object ByteStreamWriter { + + /** + * Creates a new [[ByteStreamWriter]] from an OutputStream. + */ + def of(outputStream: OutputStream): ByteStreamWriter = new ByteStreamWriter { + override def write(bytes: Array[Byte]): Unit = + outputStream.write(bytes) + + override def write(bytes: Array[Byte], offset: Int, count: Int): Unit = + outputStream.write(bytes, offset, count) + + override def write(byte: Int): Unit = + outputStream.write(byte) + } + } + + + + /** + * Create a new default Binary Propagation instance from the provided configuration. + * + * @param config Binary Propagation channel configuration + * @return A newly constructed HttpPropagation instance. + */ + def from(config: Config, classLoading: ClassLoading): Propagation[ByteStreamReader, ByteStreamWriter] = { + new BinaryPropagation.Default(Settings.from(config, classLoading)) + } + + /** + * Default Binary propagation in Kamon. This implementation uses Colfer to read and write the context tags and + * entries. Entries are represented as simple pairs of entry name and bytes, which are then processed by the all + * configured entry readers and writers. + */ + class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] { + private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default]) + private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] { + override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128) + } + + private val _contextBufferPool = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize) + } + + override def read(reader: ByteStreamReader): Context = { + if(reader.available() > 0) { + val contextData = Try { + val cContext = new ColferContext() + cContext.unmarshal(reader.readAll(), 0) + cContext + } + + contextData.failed.foreach { + case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t) + } + + contextData.map { colferContext => + + // Context tags + var tagSectionsCount = colferContext.tags.length + if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { + _log.warn("Malformed Context tags found, tags consistency might be compromised") + tagSectionsCount -= 1 + } + + val tags = if (tagSectionsCount > 0) { + val tagsBuilder = Map.newBuilder[String, String] + var tagIndex = 0 + while (tagIndex < tagSectionsCount) { + tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1)) + tagIndex += 2 + } + tagsBuilder.result() + + } else Map.empty[String, String] + + + // Only reads the entries for which there is a registered reader + colferContext.entries.foldLeft(Context.of(tags)) { + case (context, entryData) => + settings.incomingEntries.get(entryData.name).map { entryReader => + var contextWithEntry = context + try { + contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) + } catch { + case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) + } + + contextWithEntry + }.getOrElse(context) + } + } getOrElse(Context.Empty) + } else Context.Empty + } + + override def write(context: Context, writer: ByteStreamWriter): Unit = { + if (context.nonEmpty()) { + val contextData = new ColferContext() + val output = _streamPool.get() + val contextOutgoingBuffer = _contextBufferPool.get() + + if (context.tags.nonEmpty) { + val tags = Array.ofDim[String](context.tags.size * 2) + var tagIndex = 0 + context.tags.foreach { + case (key, value) => + tags.update(tagIndex, key) + tags.update(tagIndex + 1, value) + tagIndex += 2 + } + + contextData.tags = tags + } + + if (context.entries.nonEmpty) { + val entries = settings.outgoingEntries.collect { + case (entryName, entryWriter) if context.entries.contains(entryName) => + val colferEntry = new ColferEntry() + try { + output.reset() + entryWriter.write(context, output) + + colferEntry.name = entryName + colferEntry.content = output.toByteArray() + } catch { + case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t) + } + + colferEntry + } + + contextData.entries = entries.toArray + } + + try { + val contextSize = contextData.marshal(contextOutgoingBuffer, 0) + writer.write(contextOutgoingBuffer, 0, contextSize) + } catch { + case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t) + } + } + } + } + + object Default { + private class ReusableByteStreamWriter(size: Int) extends ByteArrayOutputStream(size) with ByteStreamWriter { + def underlying(): Array[Byte] = this.buf + } + } + + case class Settings( + maxOutgoingSize: Int, + incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]], + outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]] + ) + + object Settings { + private val log = LoggerFactory.getLogger(classOf[BinaryPropagation.Settings]) + + def from(config: Config, classLoading: ClassLoading): BinaryPropagation.Settings = { + def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { + val instanceMap = Map.newBuilder[String, ExpectedType] + + mappings.foreach { + case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match { + case Success(componentInstance) => instanceMap += (contextKey -> componentInstance) + case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception) + } + } + + instanceMap.result() + } + + Settings( + config.getBytes("max-outgoing-size").toInt, + buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs), + buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs) + ) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala deleted file mode 100644 index c5d237a9..00000000 --- a/kamon-core/src/main/scala/kamon/context/Codecs.scala +++ /dev/null @@ -1,295 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon -package context - -import java.nio.ByteBuffer - -import com.typesafe.config.Config -import kamon.util.DynamicAccess -import org.slf4j.LoggerFactory -import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} - -import scala.collection.mutable - -class Codecs(initialConfig: Config) { - private val log = LoggerFactory.getLogger(classOf[Codecs]) - @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty) - @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty) - - reconfigure(initialConfig) - - - def HttpHeaders: Codecs.ForContext[TextMap] = - httpHeaders - - def Binary: Codecs.ForContext[ByteBuffer] = - binary - - def reconfigure(config: Config): Unit = { - import scala.collection.JavaConverters._ - try { - val codecsConfig = config.getConfig("kamon.context.codecs") - val stringKeys = readStringKeysConfig(codecsConfig.getConfig("string-keys")) - val knownHttpHeaderCodecs = readEntryCodecs[TextMap]("http-headers-keys", codecsConfig) ++ stringHeaderCodecs(stringKeys) - val knownBinaryCodecs = readEntryCodecs[ByteBuffer]("binary-keys", codecsConfig) ++ stringBinaryCodecs(stringKeys) - - httpHeaders = new Codecs.HttpHeaders(knownHttpHeaderCodecs) - binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), knownBinaryCodecs) - } catch { - case t: Throwable => log.error("Failed to initialize Context Codecs", t) - } - } - - private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = { - val rootConfig = config.getConfig(rootKey) - val dynamic = new DynamicAccess(getClass.getClassLoader) - val entries = Map.newBuilder[String, Codecs.ForEntry[T]] - - rootConfig.topLevelKeys.foreach(key => { - try { - val fqcn = rootConfig.getString(key) - entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get)) - } catch { - case e: Throwable => - log.error(s"Failed to initialize codec for key [$key]", e) - } - }) - - entries.result() - } - - private def readStringKeysConfig(config: Config): Map[String, String] = - config.topLevelKeys.map(key => (key, config.getString(key))).toMap - - private def stringHeaderCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[TextMap]] = - keys.map { case (key, header) => (key, new Codecs.StringHeadersCodec(key, header)) } - - private def stringBinaryCodecs(keys: Map[String, String]): Map[String, Codecs.ForEntry[ByteBuffer]] = - keys.map { case (key, _) => (key, new Codecs.StringBinaryCodec(key)) } -} - -object Codecs { - - trait ForContext[T] { - def encode(context: Context): T - def decode(carrier: T): Context - } - - trait ForEntry[T] { - def encode(context: Context): T - def decode(carrier: T, context: Context): Context - } - - final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] { - private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) - - override def encode(context: Context): TextMap = { - val encoded = TextMap.Default() - - context.entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(codec) => - try { - codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) - } catch { - case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) - } - - case None => - log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } - - encoded - } - - override def decode(carrier: TextMap): Context = { - var context: Context = Context.Empty - - try { - context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { - val (_, codec) = codecEntry - codec.decode(carrier, ctx) - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from HttpHeaders", e) - } - - context - } - } - - - final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] { - private val log = LoggerFactory.getLogger(classOf[Binary]) - private val binaryBuffer = newThreadLocalBuffer(bufferSize) - private val emptyBuffer = ByteBuffer.allocate(0) - - override def encode(context: Context): ByteBuffer = { - val entries = context.entries - if(entries.isEmpty) - emptyBuffer - else { - var colferEntries: List[ColferEntry] = Nil - entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(entryCodec) => - try { - val entryData = entryCodec.encode(context) - if(entryData.capacity() > 0) { - val colferEntry = new ColferEntry() - colferEntry.setName(key.name) - colferEntry.setContent(entryData.array()) - colferEntries = colferEntry :: colferEntries - } - } catch { - case throwable: Throwable => - log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) - } - - case None => - log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } - - if(colferEntries.isEmpty) - emptyBuffer - else { - val buffer = binaryBuffer.get() - val colferContext = new ColferContext() - colferContext.setEntries(colferEntries.toArray) - val marshalledSize = colferContext.marshal(buffer, 0) - - val data = Array.ofDim[Byte](marshalledSize) - System.arraycopy(buffer, 0, data, 0, marshalledSize) - ByteBuffer.wrap(data) - } - } - } - - - override def decode(carrier: ByteBuffer): Context = { - if(carrier.capacity() == 0) - Context.Empty - else { - var context: Context = Context.Empty - - try { - val colferContext = new ColferContext() - colferContext.unmarshal(carrier.array(), 0) - - colferContext.entries.foreach(colferEntry => { - entryCodecs.get(colferEntry.getName()) match { - case Some(entryCodec) => - context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context) - - case None => - log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName()) - } - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from Binary", e) - } - - context - } - } - - - private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] { - override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt) - } - } - - private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] { - private val contextKey = Key.broadcast[Option[String]](key, None) - - override def encode(context: Context): TextMap = { - val textMap = TextMap.Default() - context.get(contextKey).foreach { value => - textMap.put(headerName, value) - } - - textMap - } - - override def decode(carrier: TextMap, context: Context): Context = { - carrier.get(headerName) match { - case value @ Some(_) => context.withKey(contextKey, value) - case None => context - } - } - } - - private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] { - val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0) - private val contextKey = Key.broadcast[Option[String]](key, None) - - override def encode(context: Context): ByteBuffer = { - context.get(contextKey) match { - case Some(value) => ByteBuffer.wrap(value.getBytes) - case None => emptyBuffer - } - } - - override def decode(carrier: ByteBuffer, context: Context): Context = { - context.withKey(contextKey, Some(new String(carrier.array()))) - } - } -} - - -trait TextMap { - - def get(key: String): Option[String] - - def put(key: String, value: String): Unit - - def values: Iterator[(String, String)] -} - -object TextMap { - - class Default extends TextMap { - private val storage = - mutable.Map.empty[String, String] - - override def get(key: String): Option[String] = - storage.get(key) - - override def put(key: String, value: String): Unit = - storage.put(key, value) - - override def values: Iterator[(String, String)] = - storage.toIterator - } - - object Default { - def apply(): Default = - new Default() - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index e0b084cb..2a7a382e 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -1,5 +1,5 @@ /* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * Copyright © 2013-2018 the kamon project <http://kamon.io/> * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at @@ -13,90 +13,88 @@ * ========================================================================================= */ -package kamon.context +package kamon +package context -import java.io._ -import java.nio.ByteBuffer +import java.util.{Map => JavaMap} +import scala.collection.JavaConverters._ -import kamon.Kamon +class Context private (val entries: Map[String, Any], val tags: Map[String, String]) { -class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable { - def get[T](key: Key[T]): T = - entries.getOrElse(key, key.emptyValue).asInstanceOf[T] + def get[T](key: Context.Key[T]): T = + entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T] - def withKey[T](key: Key[T], value: T): Context = - new Context(entries.updated(key, value)) + def getTag(tagKey: String): Option[String] = + tags.get(tagKey) - var _deserializedEntries: Map[Key[_], Any] = Map.empty + def withKey[T](key: Context.Key[T], value: T): Context = + new Context(entries.updated(key.name, value), tags) - @throws[IOException] - private def writeObject(out: ObjectOutputStream): Unit = out.write( - Kamon.contextCodec().Binary.encode(this).array() - ) + def withTag(tagKey: String, tagValue: String): Context = + new Context(entries, tags.updated(tagKey, tagValue)) - @throws[IOException] - @throws[ClassNotFoundException] - private def readObject(in: ObjectInputStream): Unit = { - val buf = new Array[Byte](in.available()) - in.readFully(buf) - _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries - } + def withTags(tags: Map[String, String]): Context = + new Context(entries, this.tags ++ tags) - def readResolve(): AnyRef = new Context(_deserializedEntries) + def withTags(tags: JavaMap[String, String]): Context = + new Context(entries, this.tags ++ tags.asScala.toMap) - override def equals(obj: scala.Any): Boolean = { - obj != null && - obj.isInstanceOf[Context] && - obj.asInstanceOf[Context].entries != null && - obj.asInstanceOf[Context].entries == this.entries - } + def isEmpty(): Boolean = + entries.isEmpty && tags.isEmpty - override def hashCode(): Int = entries.hashCode() + def nonEmpty(): Boolean = + !isEmpty() } object Context { - val Empty = new Context(Map.empty) + val Empty = new Context(Map.empty, Map.empty) - def apply(): Context = - Empty + def of(tags: JavaMap[String, String]): Context = + new Context(Map.empty, tags.asScala.toMap) - def create(): Context = - Empty + def of(tags: Map[String, String]): Context = + new Context(Map.empty, tags) - def apply[T](key: Key[T], value: T): Context = - new Context(Map(key -> value)) + def of[T](key: Context.Key[T], value: T): Context = + new Context(Map(key.name -> value), Map.empty) - def create[T](key: Key[T], value: T): Context = - apply(key, value) - -} - - -sealed abstract class Key[T] { - def name: String - def emptyValue: T - def broadcast: Boolean -} + def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context = + new Context(Map(key.name -> value), tags.asScala.toMap) -object Key { + def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context = + new Context(Map(key.name -> value), tags) - def local[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, false) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context = + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty) - def broadcast[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context = + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap) - def broadcastString(name: String): Key[Option[String]] = - new Default[Option[String]](name, None, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context = + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags) + def key[T](name: String, emptyValue: T): Context.Key[T] = + new Context.Key(name, emptyValue) - private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] { + /** + * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance + * must be done using a context key, which will ensure the right type is used on both operations. The key's name + * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels. + * + * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned + * instead. + * + * @param name Key name. Must be unique. + * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key. + * @tparam ValueType Type of the value to be held on the context with this key. + */ + final class Key[ValueType](val name: String, val emptyValue: ValueType) { override def hashCode(): Int = name.hashCode override def equals(that: Any): Boolean = - that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name + that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala new file mode 100644 index 00000000..6a15e2a6 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -0,0 +1,206 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import com.typesafe.config.Config +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag +import scala.util.control.NonFatal +import scala.util.{Failure, Success} + +/** + * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of + * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on + * outgoing requests to transfer a context to remote services. + */ +object HttpPropagation { + + /** + * Wrapper that reads HTTP headers from a HTTP message. + */ + trait HeaderReader { + + /** + * Reads a single HTTP header value. + * + * @param header HTTP header name + * @return The HTTP header value, if present. + */ + def read(header: String): Option[String] + + /** + * Reads all HTTP headers present in the wrapped HTTP message. + * + * @return A map from header name to + */ + def readAll(): Map[String, String] + } + + /** + * Wrapper that writes HTTP headers to a HTTP message. + */ + trait HeaderWriter { + + /** + * Writes a HTTP header into a HTTP message. + * + * @param header HTTP header name. + * @param value HTTP header value. + */ + def write(header: String, value: String): Unit + } + + + + /** + * Create a new default HTTP propagation instance from the provided configuration. + * + * @param config HTTP propagation channel configuration + * @return A newly constructed HttpPropagation instance. + */ + def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = { + new HttpPropagation.Default(Settings.from(config, classLoading)) + } + + /** + * Default HTTP Propagation in Kamon. + */ + class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default]) + + /** + * Reads context tags and entries on the following order: + * 1. Read all context tags from the context tags header. + * 2. Read all context tags with explicit mappings. This overrides any tag + * from the previous step in case of a tag key clash. + * 3. Read all context entries using the incoming entries configuration. + */ + override def read(reader: HeaderReader): Context = { + val tags = Map.newBuilder[String, String] + + // Tags encoded together in the context tags header. + try { + reader.read(settings.tagsHeaderName).foreach { contextTagsHeader => + contextTagsHeader.split(";").foreach(tagData => { + val tagPair = tagData.split("=") + if (tagPair.length == 2) { + tags += (tagPair(0) -> tagPair(1)) + } + }) + } + } catch { + case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any]) + } + + // Tags explicitly mapped on the tags.mappings configuration. + settings.tagsMappings.foreach { + case (tagName, httpHeader) => + try { + reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + } catch { + case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) + } + } + + // Incoming Entries + settings.incomingEntries.foldLeft(Context.of(tags.result())) { + case (context, (entryName, entryDecoder)) => + var result = context + try { + result = entryDecoder.read(reader, context) + } catch { + case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + + result + } + } + + /** + * Writes context tags and entries + */ + override def write(context: Context, writer: HeaderWriter): Unit = { + val contextTagsHeader = new StringBuilder() + def appendTag(key: String, value: String): Unit = { + contextTagsHeader + .append(key) + .append('=') + .append(value) + .append(';') + } + + // Write tags with specific mappings or append them to the context tags header. + context.tags.foreach { + case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match { + case Some(mappedHeader) => writer.write(mappedHeader, tagValue) + case None => appendTag(tagKey, tagValue) + } + } + + // Write the context tags header. + if(contextTagsHeader.nonEmpty) { + writer.write(settings.tagsHeaderName, contextTagsHeader.result()) + } + + // Write entries for the specified direction. + settings.outgoingEntries.foreach { + case (entryName, entryWriter) => + try { + entryWriter.write(context, writer) + } catch { + case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + } + } + } + + case class Settings( + tagsHeaderName: String, + tagsMappings: Map[String, String], + incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]], + outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]] + ) + + object Settings { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings]) + + def from(config: Config, classLoading: ClassLoading): Settings = { + def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { + val instanceMap = Map.newBuilder[String, ExpectedType] + + mappings.foreach { + case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match { + case Success(componentInstance) => instanceMap += (contextKey -> componentInstance) + case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception) + } + } + + instanceMap.result() + } + + val tagsHeaderName = config.getString("tags.header-name") + val tagsMappings = config.getConfig("tags.mappings").pairs + val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs) + val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs) + + Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala deleted file mode 100644 index 3445cc31..00000000 --- a/kamon-core/src/main/scala/kamon/context/Mixin.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import kamon.Kamon - - -/** - * Utility trait that marks objects carrying a reference to a Context instance. - * - */ -trait HasContext { - def context: Context -} - -object HasContext { - private case class Default(context: Context) extends HasContext - - /** - * Construct a HasSpan instance that references the provided Context. - * - */ - def from(context: Context): HasContext = - Default(context) - - /** - * Construct a HasContext instance with the current Kamon from Kamon's default context storage. - * - */ - def fromCurrentContext(): HasContext = - Default(Kamon.currentContext()) -} diff --git a/kamon-core/src/main/scala/kamon/context/Propagation.scala b/kamon-core/src/main/scala/kamon/context/Propagation.scala new file mode 100644 index 00000000..1d973ca9 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Propagation.scala @@ -0,0 +1,81 @@ +/* ========================================================================================= + * Copyright © 2013-2018 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.context + +/** + * Inter-process Context propagation interface. Implementations are responsible from reading and writing a lossless + * representation of a given Context instance to the appropriate mediums. Out of the box, Kamon ships with two + * implementations: + * + * * HttpPropagation: Uses HTTP headers as the medium to transport the Context data. + * * BinaryPropagation: Uses byte streams as the medium to transport the Context data. + * + */ +trait Propagation[ReaderMedium, WriterMedium] { + + /** + * Attempts to read a Context from the [[ReaderMedium]]. + * + * @param medium An abstraction the reads data from the medium transporting the Context data. + * @return The decoded Context instance or an empty Context if no entries or tags could be read from the medium. + */ + def read(medium: ReaderMedium): Context + + /** + * Attempts to write a Context instance to the [[WriterMedium]]. + * + * @param context Context instance to be written. + * @param medium An abstraction that writes data into the medium that will transport the Context data. + */ + def write(context: Context, medium: WriterMedium): Unit + +} + +object Propagation { + + /** + * Encapsulates logic required to read a single context entry from a medium. Implementations of this trait + * must be aware of the entry they are able to read. + */ + trait EntryReader[Medium] { + + /** + * Tries to read a context entry from the medium. If a context entry is successfully read, implementations + * must return an updated context instance that includes such entry. If no entry could be read simply return the + * context instance that was passed in, unchanged. + * + * @param medium An abstraction the reads data from the medium transporting the Context data. + * @param context Current context. + * @return Either the original context passed in or a modified version of it, including the read entry. + */ + def read(medium: Medium, context: Context): Context + } + + /** + * Encapsulates logic required to write a single context entry to the medium. Implementations of this trait + * must be aware of the entry they are able to write. + */ + trait EntryWriter[Medium] { + + /** + * Tries to write a context entry into the medium. + * + * @param context The context from which entries should be written. + * @param medium An abstraction that writes data into the medium that will transport the Context data. + */ + def write(context: Context, medium: Medium): Unit + } +} |