diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
3 files changed, 205 insertions, 67 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala index 75e65c44..296003d5 100644 --- a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala @@ -20,7 +20,9 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, Output import com.typesafe.config.Config import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} -import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry, Tags => ColferTags} +import kamon.context.generated.binary.context.{StringTag => ColferStringTag, LongTag => ColferLongTag, BooleanTag => ColferBooleanTag} +import kamon.tag.{Tag, TagSet} import org.slf4j.LoggerFactory import scala.reflect.ClassTag @@ -152,7 +154,7 @@ object BinaryPropagation { * configured entry readers and writers. */ class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] { - private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default]) + private val _logger = LoggerFactory.getLogger(classOf[BinaryPropagation.Default]) private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] { override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128) } @@ -170,39 +172,29 @@ object BinaryPropagation { } contextData.failed.foreach { - case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t) + case NonFatal(t) => _logger.warn("Failed to read Context from ByteStreamReader", t) } contextData.map { colferContext => // Context tags - var tagSectionsCount = colferContext.tags.length - if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { - _log.warn("Malformed Context tags found, tags consistency might be compromised") - tagSectionsCount -= 1 + val tagsBuilder = Map.newBuilder[String, Any] + if(colferContext.tags != null) { + colferContext.tags.strings.foreach(t => tagsBuilder += (t.key -> t.value)) + colferContext.tags.longs.foreach(t => tagsBuilder += (t.key -> t.value)) + colferContext.tags.booleans.foreach(t => tagsBuilder += (t.key -> t.value)) } - - val tags = if (tagSectionsCount > 0) { - val tagsBuilder = Map.newBuilder[String, String] - var tagIndex = 0 - while (tagIndex < tagSectionsCount) { - tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1)) - tagIndex += 2 - } - tagsBuilder.result() - - } else Map.empty[String, String] - + val tags = TagSet.from(tagsBuilder.result()) // Only reads the entries for which there is a registered reader colferContext.entries.foldLeft(Context.of(tags)) { case (context, entryData) => - settings.incomingEntries.get(entryData.name).map { entryReader => + settings.incomingEntries.get(entryData.key).map { entryReader => var contextWithEntry = context try { - contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) + contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.value), context) } catch { - case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) + case NonFatal(t) => _logger.warn("Failed to read entry [{}]", entryData.key.asInstanceOf[Any], t) } contextWithEntry @@ -218,17 +210,36 @@ object BinaryPropagation { val output = _streamPool.get() val contextOutgoingBuffer = _contextBufferPool.get() - if (context.tags.nonEmpty) { - val tags = Array.ofDim[String](context.tags.size * 2) - var tagIndex = 0 - context.tags.foreach { - case (key, value) => - tags.update(tagIndex, key) - tags.update(tagIndex + 1, value) - tagIndex += 2 + if(context.tags.nonEmpty()) { + val tagsData = new ColferTags() + val strings = Array.newBuilder[ColferStringTag] + val longs = Array.newBuilder[ColferLongTag] + val booleans = Array.newBuilder[ColferBooleanTag] + + context.tags.iterator().foreach { + case t: Tag.String => + val st = new ColferStringTag() + st.setKey(t.key) + st.setValue(t.value) + strings += st + + case t: Tag.Long => + val lt = new ColferLongTag() + lt.setKey(t.key) + lt.setValue(t.value) + longs += lt + + case t: Tag.Boolean => + val bt = new ColferBooleanTag() + bt.setKey(t.key) + bt.setValue(t.value) + booleans += bt } - contextData.tags = tags + tagsData.setStrings(strings.result()) + tagsData.setLongs(longs.result()) + tagsData.setBooleans(booleans.result()) + contextData.setTags(tagsData) } if (context.entries.nonEmpty) { @@ -239,10 +250,10 @@ object BinaryPropagation { output.reset() entryWriter.write(context, output) - colferEntry.name = entryName - colferEntry.content = output.toByteArray() + colferEntry.key = entryName + colferEntry.value = output.toByteArray() } catch { - case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t) + case NonFatal(t) => _logger.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t) } colferEntry @@ -255,7 +266,7 @@ object BinaryPropagation { val contextSize = contextData.marshal(contextOutgoingBuffer, 0) writer.write(contextOutgoingBuffer, 0, contextSize) } catch { - case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t) + case NonFatal(t) => _logger.warn("Failed to write Context to ByteStreamWriter", t) } } } diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index 2a7a382e..054a7897 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -16,32 +16,92 @@ package kamon package context -import java.util.{Map => JavaMap} -import scala.collection.JavaConverters._ - -class Context private (val entries: Map[String, Any], val tags: Map[String, String]) { +import kamon.tag.TagSet + + +/** + * An immutable set of information that is tied to the processing of single operation in a service. A Context instance + * can contain tags and entries. + * + * Context tags are built on top of the TagSet abstraction that ships with Kamon and since Kamon knows exactly what + * types of values can be included in a TagSet it can automatically serialize and deserialize them when going over + * HTTP and/or Binary transports. + * + * Context entries can contain any arbitrary type specified by the user, but require additional configuration and + * implementation of entry readers and writers if you need them to go over HTTP and/or Binary transports. + * + * Context instances are meant to be constructed by using the builder functions on the Context companion object. + * + */ +class Context private (val entries: Map[String, Any], val tags: TagSet) { + /** + * Gets an entry from this Context. If the entry is present it's current value is returned, otherwise the empty value + * from the provided key will be returned. + */ def get[T](key: Context.Key[T]): T = entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T] - def getTag(tagKey: String): Option[String] = - tags.get(tagKey) + /** + * Executes a lookup on the context tags. The actual return type depends on the provided lookup instance. Take a look + * at the built-in lookups available on the Lookups companion object. + */ + def getTag[T](lookup: TagSet.Lookup[T]): T = + tags.get(lookup) + + + /** + * Creates a new Context instance that includes the provided key and value. If the provided key was already + * associated with another value then the previous value will be discarded and overwritten with the provided one. + */ def withKey[T](key: Context.Key[T], value: T): Context = new Context(entries.updated(key.name, value), tags) - def withTag(tagKey: String, tagValue: String): Context = - new Context(entries, tags.updated(tagKey, tagValue)) - def withTags(tags: Map[String, String]): Context = - new Context(entries, this.tags ++ tags) + /** + * Creates a new Context instance that includes the provided tag key and value. If the provided tag key was already + * associated with another value then the previous tag value will be discarded and overwritten with the provided one. + */ + def withTag(key: String, value: String): Context = + new Context(entries, tags.withTag(key, value)) + + + /** + * Creates a new Context instance that includes the provided tag key and value. If the provided tag key was already + * associated with another value then the previous tag value will be discarded and overwritten with the provided one. + */ + def withTag(key: String, value: Long): Context = + new Context(entries, tags.withTag(key, value)) + + + /** + * Creates a new Context instance that includes the provided tag key and value. If the provided tag key was already + * associated with another value then the previous tag value will be discarded and overwritten with the provided one. + */ + def withTag(key: String, value: Boolean): Context = + new Context(entries, tags.withTag(key, value)) + + + /** + * Creates a new Context instance that includes the provided tags. If any of the tags in this instance are associated + * to a key present on the provided tags then the previous values will be discarded and overwritten with the provided + * ones. + */ + def withTags(tags: TagSet): Context = + new Context(entries, this.tags.and(tags)) - def withTags(tags: JavaMap[String, String]): Context = - new Context(entries, this.tags ++ tags.asScala.toMap) + /** + * Returns whether this Context does not have any tags and does not have any entries. + */ def isEmpty(): Boolean = entries.isEmpty && tags.isEmpty + + /** + * Returns whether this Context has any information, either as tags or entries. + */ def nonEmpty(): Boolean = !isEmpty() @@ -49,32 +109,48 @@ class Context private (val entries: Map[String, Any], val tags: Map[String, Stri object Context { - val Empty = new Context(Map.empty, Map.empty) + val Empty = new Context(Map.empty, TagSet.Empty) - def of(tags: JavaMap[String, String]): Context = - new Context(Map.empty, tags.asScala.toMap) - - def of(tags: Map[String, String]): Context = + /** + * Creates a new Context instance with the provided tags and no entries. + */ + def of(tags: TagSet): Context = new Context(Map.empty, tags) + + /** + * Creates a new Context instance with the provided key and no tags. + */ def of[T](key: Context.Key[T], value: T): Context = - new Context(Map(key.name -> value), Map.empty) + new Context(Map(key.name -> value), TagSet.Empty) - def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context = - new Context(Map(key.name -> value), tags.asScala.toMap) - def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context = + /** + * Creates a new Context instance with a single entry and the provided tags. + */ + def of[T](key: Context.Key[T], value: T, tags: TagSet): Context = new Context(Map(key.name -> value), tags) + + /** + * Creates a new Context instance with two entries and no tags. + */ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context = - new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty) + new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), TagSet.Empty) - def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context = - new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap) - def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context = + /** + * Creates a new Context instance with two entries and the provided tags. + */ + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: TagSet): Context = new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags) + + /** + * Creates a new Context.Key instance that can be used to insert and retrieve values from the context entries. + * Context keys must have a unique name since they will be looked up in transports by their name and the context + * entries are internally stored using their key name as index. + */ def key[T](name: String, emptyValue: T): Context.Key[T] = new Context.Key(name, emptyValue) diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala index 6a15e2a6..fbee75cc 100644 --- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -17,6 +17,7 @@ package kamon package context import com.typesafe.config.Config +import kamon.tag.{Tag, TagSet} import org.slf4j.LoggerFactory import scala.reflect.ClassTag @@ -91,7 +92,7 @@ object HttpPropagation { * 3. Read all context entries using the incoming entries configuration. */ override def read(reader: HeaderReader): Context = { - val tags = Map.newBuilder[String, String] + val tags = Map.newBuilder[String, Any] // Tags encoded together in the context tags header. try { @@ -99,7 +100,7 @@ object HttpPropagation { contextTagsHeader.split(";").foreach(tagData => { val tagPair = tagData.split("=") if (tagPair.length == 2) { - tags += (tagPair(0) -> tagPair(1)) + tags += (tagPair(0) -> parseTagValue(tagPair(1))) } }) } @@ -118,7 +119,7 @@ object HttpPropagation { } // Incoming Entries - settings.incomingEntries.foldLeft(Context.of(tags.result())) { + settings.incomingEntries.foldLeft(Context.of(TagSet.from(tags.result()))) { case (context, (entryName, entryDecoder)) => var result = context try { @@ -145,10 +146,12 @@ object HttpPropagation { } // Write tags with specific mappings or append them to the context tags header. - context.tags.foreach { - case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match { - case Some(mappedHeader) => writer.write(mappedHeader, tagValue) - case None => appendTag(tagKey, tagValue) + context.tags.iterator().foreach { tag => + val tagKey = tag.key + + settings.tagsMappings.get(tagKey) match { + case Some(mappedHeader) => writer.write(mappedHeader, tagValueWithPrefix(tag)) + case None => appendTag(tagKey, Tag.unwrapValue(tag).toString) } } @@ -167,6 +170,54 @@ object HttpPropagation { } } } + + + private val _longTypePrefix = "l:" + private val _booleanTypePrefix = "b:" + private val _booleanTrue = "true" + private val _booleanFalse = "false" + + /** + * Tries to infer and parse a value into one of the supported tag types: String, Long or Boolean by looking for the + * type indicator prefix on the tag value. If the inference fails it will default to treat the value as a String. + */ + private def parseTagValue(value: String): Any = { + if (value.isEmpty || value.length < 2) // Empty and short values definitely do not have type indicators. + value + else { + if(value.startsWith(_longTypePrefix)) { + // Try to parse the content as a Long value. + val remaining = value.substring(2) + try { + java.lang.Long.parseLong(remaining) + } catch { + case _: Throwable => remaining + } + + } else if(value.startsWith(_booleanTypePrefix)) { + + // Try to parse the content as a Boolean value. + val remaining = value.substring(2) + if(remaining.equals(_booleanTrue)) + true + else if(remaining.equals(_booleanFalse)) + false + else + remaining + + } else value + } + } + + /** + * Returns the actual value to be written in the HTTP transport, with a type prefix if applicable. + */ + private def tagValueWithPrefix(tag: Tag): String = tag match { + case t: Tag.String => t.value + case t: Tag.Boolean => _booleanTypePrefix + t.value.toString + case t: Tag.Long => _longTypePrefix + t.value.toString + } + } case class Settings( |