aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala81
1 files changed, 46 insertions, 35 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
index 75e65c44..296003d5 100644
--- a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
@@ -20,7 +20,9 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, Output
import com.typesafe.config.Config
import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
-import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
+import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry, Tags => ColferTags}
+import kamon.context.generated.binary.context.{StringTag => ColferStringTag, LongTag => ColferLongTag, BooleanTag => ColferBooleanTag}
+import kamon.tag.{Tag, TagSet}
import org.slf4j.LoggerFactory
import scala.reflect.ClassTag
@@ -152,7 +154,7 @@ object BinaryPropagation {
* configured entry readers and writers.
*/
class Default(settings: Settings) extends Propagation[ByteStreamReader, ByteStreamWriter] {
- private val _log = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
+ private val _logger = LoggerFactory.getLogger(classOf[BinaryPropagation.Default])
private val _streamPool = new ThreadLocal[Default.ReusableByteStreamWriter] {
override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
}
@@ -170,39 +172,29 @@ object BinaryPropagation {
}
contextData.failed.foreach {
- case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t)
+ case NonFatal(t) => _logger.warn("Failed to read Context from ByteStreamReader", t)
}
contextData.map { colferContext =>
// Context tags
- var tagSectionsCount = colferContext.tags.length
- if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
- _log.warn("Malformed Context tags found, tags consistency might be compromised")
- tagSectionsCount -= 1
+ val tagsBuilder = Map.newBuilder[String, Any]
+ if(colferContext.tags != null) {
+ colferContext.tags.strings.foreach(t => tagsBuilder += (t.key -> t.value))
+ colferContext.tags.longs.foreach(t => tagsBuilder += (t.key -> t.value))
+ colferContext.tags.booleans.foreach(t => tagsBuilder += (t.key -> t.value))
}
-
- val tags = if (tagSectionsCount > 0) {
- val tagsBuilder = Map.newBuilder[String, String]
- var tagIndex = 0
- while (tagIndex < tagSectionsCount) {
- tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1))
- tagIndex += 2
- }
- tagsBuilder.result()
-
- } else Map.empty[String, String]
-
+ val tags = TagSet.from(tagsBuilder.result())
// Only reads the entries for which there is a registered reader
colferContext.entries.foldLeft(Context.of(tags)) {
case (context, entryData) =>
- settings.incomingEntries.get(entryData.name).map { entryReader =>
+ settings.incomingEntries.get(entryData.key).map { entryReader =>
var contextWithEntry = context
try {
- contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
+ contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.value), context)
} catch {
- case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
+ case NonFatal(t) => _logger.warn("Failed to read entry [{}]", entryData.key.asInstanceOf[Any], t)
}
contextWithEntry
@@ -218,17 +210,36 @@ object BinaryPropagation {
val output = _streamPool.get()
val contextOutgoingBuffer = _contextBufferPool.get()
- if (context.tags.nonEmpty) {
- val tags = Array.ofDim[String](context.tags.size * 2)
- var tagIndex = 0
- context.tags.foreach {
- case (key, value) =>
- tags.update(tagIndex, key)
- tags.update(tagIndex + 1, value)
- tagIndex += 2
+ if(context.tags.nonEmpty()) {
+ val tagsData = new ColferTags()
+ val strings = Array.newBuilder[ColferStringTag]
+ val longs = Array.newBuilder[ColferLongTag]
+ val booleans = Array.newBuilder[ColferBooleanTag]
+
+ context.tags.iterator().foreach {
+ case t: Tag.String =>
+ val st = new ColferStringTag()
+ st.setKey(t.key)
+ st.setValue(t.value)
+ strings += st
+
+ case t: Tag.Long =>
+ val lt = new ColferLongTag()
+ lt.setKey(t.key)
+ lt.setValue(t.value)
+ longs += lt
+
+ case t: Tag.Boolean =>
+ val bt = new ColferBooleanTag()
+ bt.setKey(t.key)
+ bt.setValue(t.value)
+ booleans += bt
}
- contextData.tags = tags
+ tagsData.setStrings(strings.result())
+ tagsData.setLongs(longs.result())
+ tagsData.setBooleans(booleans.result())
+ contextData.setTags(tagsData)
}
if (context.entries.nonEmpty) {
@@ -239,10 +250,10 @@ object BinaryPropagation {
output.reset()
entryWriter.write(context, output)
- colferEntry.name = entryName
- colferEntry.content = output.toByteArray()
+ colferEntry.key = entryName
+ colferEntry.value = output.toByteArray()
} catch {
- case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
+ case NonFatal(t) => _logger.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
}
colferEntry
@@ -255,7 +266,7 @@ object BinaryPropagation {
val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
writer.write(contextOutgoingBuffer, 0, contextSize)
} catch {
- case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t)
+ case NonFatal(t) => _logger.warn("Failed to write Context to ByteStreamWriter", t)
}
}
}