aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
-rw-r--r--kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala81
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala126
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala65
3 files changed, 205 insertions, 67 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
index 75e65c44..296003d5 100644
--- a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
@@ -20,7 +20,9 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, Output
import com.typesafe.config.Config
import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
-import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
+import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry, Tags => ColferTags}
+import kamon.context.generated.binary.context.{StringTag => ColferStringTag, LongTag => ColferLongTag, BooleanTag => ColferBooleanTag}
+import kamon.tag.{Tag, TagSet}
import org.slf4j.LoggerFactory
import scala.reflect.ClassTag
@@ -152,7 +154,7 @@ object BinaryPropagation {
* configured entry readers and writers.
*/
class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] {
- private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
+ private val _logger = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] {
override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
}
@@ -170,39 +172,29 @@ object BinaryPropagation {
}
contextData.failed.foreach {
- case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t)
+ case NonFatal(t) => _logger.warn("Failed to read Context from ByteStreamReader", t)
}
contextData.map { colferContext =>
// Context tags
- var tagSectionsCount = colferContext.tags.length
- if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
- _log.warn("Malformed Context tags found, tags consistency might be compromised")
- tagSectionsCount -= 1
+ val tagsBuilder = Map.newBuilder[String, Any]
+ if(colferContext.tags != null) {
+ colferContext.tags.strings.foreach(t => tagsBuilder += (t.key -> t.value))
+ colferContext.tags.longs.foreach(t => tagsBuilder += (t.key -> t.value))
+ colferContext.tags.booleans.foreach(t => tagsBuilder += (t.key -> t.value))
}
-
- val tags = if (tagSectionsCount > 0) {
- val tagsBuilder = Map.newBuilder[String, String]
- var tagIndex = 0
- while (tagIndex < tagSectionsCount) {
- tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1))
- tagIndex += 2
- }
- tagsBuilder.result()
-
- } else Map.empty[String, String]
-
+ val tags = TagSet.from(tagsBuilder.result())
// Only reads the entries for which there is a registered reader
colferContext.entries.foldLeft(Context.of(tags)) {
case (context, entryData) =>
- settings.incomingEntries.get(entryData.name).map { entryReader =>
+ settings.incomingEntries.get(entryData.key).map { entryReader =>
var contextWithEntry = context
try {
- contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
+ contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.value), context)
} catch {
- case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
+ case NonFatal(t) => _logger.warn("Failed to read entry [{}]", entryData.key.asInstanceOf[Any], t)
}
contextWithEntry
@@ -218,17 +210,36 @@ object BinaryPropagation {
val output = _streamPool.get()
val contextOutgoingBuffer = _contextBufferPool.get()
- if (context.tags.nonEmpty) {
- val tags = Array.ofDim[String](context.tags.size * 2)
- var tagIndex = 0
- context.tags.foreach {
- case (key, value) =>
- tags.update(tagIndex, key)
- tags.update(tagIndex + 1, value)
- tagIndex += 2
+ if(context.tags.nonEmpty()) {
+ val tagsData = new ColferTags()
+ val strings = Array.newBuilder[ColferStringTag]
+ val longs = Array.newBuilder[ColferLongTag]
+ val booleans = Array.newBuilder[ColferBooleanTag]
+
+ context.tags.iterator().foreach {
+ case t: Tag.String =>
+ val st = new ColferStringTag()
+ st.setKey(t.key)
+ st.setValue(t.value)
+ strings += st
+
+ case t: Tag.Long =>
+ val lt = new ColferLongTag()
+ lt.setKey(t.key)
+ lt.setValue(t.value)
+ longs += lt
+
+ case t: Tag.Boolean =>
+ val bt = new ColferBooleanTag()
+ bt.setKey(t.key)
+ bt.setValue(t.value)
+ booleans += bt
}
- contextData.tags = tags
+ tagsData.setStrings(strings.result())
+ tagsData.setLongs(longs.result())
+ tagsData.setBooleans(booleans.result())
+ contextData.setTags(tagsData)
}
if (context.entries.nonEmpty) {
@@ -239,10 +250,10 @@ object BinaryPropagation {
output.reset()
entryWriter.write(context, output)
- colferEntry.name = entryName
- colferEntry.content = output.toByteArray()
+ colferEntry.key = entryName
+ colferEntry.value = output.toByteArray()
} catch {
- case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
+ case NonFatal(t) => _logger.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
}
colferEntry
@@ -255,7 +266,7 @@ object BinaryPropagation {
val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
writer.write(contextOutgoingBuffer, 0, contextSize)
} catch {
- case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t)
+ case NonFatal(t) => _logger.warn("Failed to write Context to ByteStreamWriter", t)
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index 2a7a382e..054a7897 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -16,32 +16,92 @@
package kamon
package context
-import java.util.{Map => JavaMap}
-import scala.collection.JavaConverters._
-
-class Context private (val entries: Map[String, Any], val tags: Map[String, String]) {
+import kamon.tag.TagSet
+
+
+/**
+ * An immutable set of information that is tied to the processing of single operation in a service. A Context instance
+ * can contain tags and entries.
+ *
+ * Context tags are built on top of the TagSet abstraction that ships with Kamon and since Kamon knows exactly what
+ * types of values can be included in a TagSet it can automatically serialize and deserialize them when going over
+ * HTTP and/or Binary transports.
+ *
+ * Context entries can contain any arbitrary type specified by the user, but require additional configuration and
+ * implementation of entry readers and writers if you need them to go over HTTP and/or Binary transports.
+ *
+ * Context instances are meant to be constructed by using the builder functions on the Context companion object.
+ *
+ */
+class Context private (val entries: Map[String, Any], val tags: TagSet) {
+ /**
+ * Gets an entry from this Context. If the entry is present it's current value is returned, otherwise the empty value
+ * from the provided key will be returned.
+ */
def get[T](key: Context.Key[T]): T =
entries.getOrElse(key.name, key.emptyValue).asInstanceOf[T]
- def getTag(tagKey: String): Option[String] =
- tags.get(tagKey)
+ /**
+ * Executes a lookup on the context tags. The actual return type depends on the provided lookup instance. Take a look
+ * at the built-in lookups available on the Lookups companion object.
+ */
+ def getTag[T](lookup: TagSet.Lookup[T]): T =
+ tags.get(lookup)
+
+
+ /**
+ * Creates a new Context instance that includes the provided key and value. If the provided key was already
+ * associated with another value then the previous value will be discarded and overwritten with the provided one.
+ */
def withKey[T](key: Context.Key[T], value: T): Context =
new Context(entries.updated(key.name, value), tags)
- def withTag(tagKey: String, tagValue: String): Context =
- new Context(entries, tags.updated(tagKey, tagValue))
- def withTags(tags: Map[String, String]): Context =
- new Context(entries, this.tags ++ tags)
+ /**
+ * Creates a new Context instance that includes the provided tag key and value. If the provided tag key was already
+ * associated with another value then the previous tag value will be discarded and overwritten with the provided one.
+ */
+ def withTag(key: String, value: String): Context =
+ new Context(entries, tags.withTag(key, value))
+
+
+ /**
+ * Creates a new Context instance that includes the provided tag key and value. If the provided tag key was already
+ * associated with another value then the previous tag value will be discarded and overwritten with the provided one.
+ */
+ def withTag(key: String, value: Long): Context =
+ new Context(entries, tags.withTag(key, value))
+
+
+ /**
+ * Creates a new Context instance that includes the provided tag key and value. If the provided tag key was already
+ * associated with another value then the previous tag value will be discarded and overwritten with the provided one.
+ */
+ def withTag(key: String, value: Boolean): Context =
+ new Context(entries, tags.withTag(key, value))
+
+
+ /**
+ * Creates a new Context instance that includes the provided tags. If any of the tags in this instance are associated
+ * to a key present on the provided tags then the previous values will be discarded and overwritten with the provided
+ * ones.
+ */
+ def withTags(tags: TagSet): Context =
+ new Context(entries, this.tags.and(tags))
- def withTags(tags: JavaMap[String, String]): Context =
- new Context(entries, this.tags ++ tags.asScala.toMap)
+ /**
+ * Returns whether this Context does not have any tags and does not have any entries.
+ */
def isEmpty(): Boolean =
entries.isEmpty && tags.isEmpty
+
+ /**
+ * Returns whether this Context has any information, either as tags or entries.
+ */
def nonEmpty(): Boolean =
!isEmpty()
@@ -49,32 +109,48 @@ class Context private (val entries: Map[String, Any], val tags: Map[String, Stri
object Context {
- val Empty = new Context(Map.empty, Map.empty)
+ val Empty = new Context(Map.empty, TagSet.Empty)
- def of(tags: JavaMap[String, String]): Context =
- new Context(Map.empty, tags.asScala.toMap)
-
- def of(tags: Map[String, String]): Context =
+ /**
+ * Creates a new Context instance with the provided tags and no entries.
+ */
+ def of(tags: TagSet): Context =
new Context(Map.empty, tags)
+
+ /**
+ * Creates a new Context instance with the provided key and no tags.
+ */
def of[T](key: Context.Key[T], value: T): Context =
- new Context(Map(key.name -> value), Map.empty)
+ new Context(Map(key.name -> value), TagSet.Empty)
- def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context =
- new Context(Map(key.name -> value), tags.asScala.toMap)
- def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context =
+ /**
+ * Creates a new Context instance with a single entry and the provided tags.
+ */
+ def of[T](key: Context.Key[T], value: T, tags: TagSet): Context =
new Context(Map(key.name -> value), tags)
+
+ /**
+ * Creates a new Context instance with two entries and no tags.
+ */
def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context =
- new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), Map.empty)
+ new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), TagSet.Empty)
- def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context =
- new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags.asScala.toMap)
- def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context =
+ /**
+ * Creates a new Context instance with two entries and the provided tags.
+ */
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: TagSet): Context =
new Context(Map(keyOne.name -> valueOne, keyTwo.name -> valueTwo), tags)
+
+ /**
+ * Creates a new Context.Key instance that can be used to insert and retrieve values from the context entries.
+ * Context keys must have a unique name since they will be looked up in transports by their name and the context
+ * entries are internally stored using their key name as index.
+ */
def key[T](name: String, emptyValue: T): Context.Key[T] =
new Context.Key(name, emptyValue)
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
index 6a15e2a6..fbee75cc 100644
--- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -17,6 +17,7 @@ package kamon
package context
import com.typesafe.config.Config
+import kamon.tag.{Tag, TagSet}
import org.slf4j.LoggerFactory
import scala.reflect.ClassTag
@@ -91,7 +92,7 @@ object HttpPropagation {
* 3. Read all context entries using the incoming entries configuration.
*/
override def read(reader: HeaderReader): Context = {
- val tags = Map.newBuilder[String, String]
+ val tags = Map.newBuilder[String, Any]
// Tags encoded together in the context tags header.
try {
@@ -99,7 +100,7 @@ object HttpPropagation {
contextTagsHeader.split(";").foreach(tagData => {
val tagPair = tagData.split("=")
if (tagPair.length == 2) {
- tags += (tagPair(0) -> tagPair(1))
+ tags += (tagPair(0) -> parseTagValue(tagPair(1)))
}
})
}
@@ -118,7 +119,7 @@ object HttpPropagation {
}
// Incoming Entries
- settings.incomingEntries.foldLeft(Context.of(tags.result())) {
+ settings.incomingEntries.foldLeft(Context.of(TagSet.from(tags.result()))) {
case (context, (entryName, entryDecoder)) =>
var result = context
try {
@@ -145,10 +146,12 @@ object HttpPropagation {
}
// Write tags with specific mappings or append them to the context tags header.
- context.tags.foreach {
- case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match {
- case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
- case None => appendTag(tagKey, tagValue)
+ context.tags.iterator().foreach { tag =>
+ val tagKey = tag.key
+
+ settings.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValueWithPrefix(tag))
+ case None => appendTag(tagKey, Tag.unwrapValue(tag).toString)
}
}
@@ -167,6 +170,54 @@ object HttpPropagation {
}
}
}
+
+
+ private val _longTypePrefix = "l:"
+ private val _booleanTypePrefix = "b:"
+ private val _booleanTrue = "true"
+ private val _booleanFalse = "false"
+
+ /**
+ * Tries to infer and parse a value into one of the supported tag types: String, Long or Boolean by looking for the
+ * type indicator prefix on the tag value. If the inference fails it will default to treat the value as a String.
+ */
+ private def parseTagValue(value: String): Any = {
+ if (value.isEmpty || value.length < 2) // Empty and short values definitely do not have type indicators.
+ value
+ else {
+ if(value.startsWith(_longTypePrefix)) {
+ // Try to parse the content as a Long value.
+ val remaining = value.substring(2)
+ try {
+ java.lang.Long.parseLong(remaining)
+ } catch {
+ case _: Throwable => remaining
+ }
+
+ } else if(value.startsWith(_booleanTypePrefix)) {
+
+ // Try to parse the content as a Boolean value.
+ val remaining = value.substring(2)
+ if(remaining.equals(_booleanTrue))
+ true
+ else if(remaining.equals(_booleanFalse))
+ false
+ else
+ remaining
+
+ } else value
+ }
+ }
+
+ /**
+ * Returns the actual value to be written in the HTTP transport, with a type prefix if applicable.
+ */
+ private def tagValueWithPrefix(tag: Tag): String = tag match {
+ case t: Tag.String => t.value
+ case t: Tag.Boolean => _booleanTypePrefix + t.value.toString
+ case t: Tag.Long => _longTypePrefix + t.value.toString
+ }
+
}
case class Settings(