aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala100
1 files changed, 62 insertions, 38 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
index 847ac452..75e65c44 100644
--- a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
/**
@@ -157,44 +157,58 @@ object BinaryPropagation {
override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
}
+ private val _contextBufferPool = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize)
+ }
+
override def read(reader: ByteStreamReader): Context = {
if(reader.available() > 0) {
- val contextData = new ColferContext()
- contextData.unmarshal(reader.readAll(), 0)
+ val contextData = Try {
+ val cContext = new ColferContext()
+ cContext.unmarshal(reader.readAll(), 0)
+ cContext
+ }
- // Context tags
- var tagSectionsCount = contextData.tags.length
- if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
- _log.warn("Malformed context tags found when trying to read a Context from ByteStreamReader")
- tagSectionsCount -= 1
+ contextData.failed.foreach {
+ case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t)
}
- val tags = if (tagSectionsCount > 0) {
- val tagsBuilder = Map.newBuilder[String, String]
- var tagIndex = 0
- while (tagIndex < tagSectionsCount) {
- tagsBuilder += (contextData.tags(tagIndex) -> contextData.tags(tagIndex + 1))
- tagIndex += 2
+ contextData.map { colferContext =>
+
+ // Context tags
+ var tagSectionsCount = colferContext.tags.length
+ if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
+ _log.warn("Malformed Context tags found, tags consistency might be compromised")
+ tagSectionsCount -= 1
}
- tagsBuilder.result()
- } else Map.empty[String, String]
+ val tags = if (tagSectionsCount > 0) {
+ val tagsBuilder = Map.newBuilder[String, String]
+ var tagIndex = 0
+ while (tagIndex < tagSectionsCount) {
+ tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1))
+ tagIndex += 2
+ }
+ tagsBuilder.result()
+ } else Map.empty[String, String]
- // Only reads the entries for which there is a registered reader
- contextData.entries.foldLeft(Context.of(tags)) {
- case (context, entryData) =>
- settings.incomingEntries.get(entryData.name).map { entryReader =>
- var contextWithEntry = context
- try {
- contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
- } catch {
- case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
- }
- contextWithEntry
- }.getOrElse(context)
- }
+ // Only reads the entries for which there is a registered reader
+ colferContext.entries.foldLeft(Context.of(tags)) {
+ case (context, entryData) =>
+ settings.incomingEntries.get(entryData.name).map { entryReader =>
+ var contextWithEntry = context
+ try {
+ contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
+ }
+
+ contextWithEntry
+ }.getOrElse(context)
+ }
+ } getOrElse(Context.Empty)
} else Context.Empty
}
@@ -202,6 +216,7 @@ object BinaryPropagation {
if (context.nonEmpty()) {
val contextData = new ColferContext()
val output = _streamPool.get()
+ val contextOutgoingBuffer = _contextBufferPool.get()
if (context.tags.nonEmpty) {
val tags = Array.ofDim[String](context.tags.size * 2)
@@ -219,22 +234,29 @@ object BinaryPropagation {
if (context.entries.nonEmpty) {
val entries = settings.outgoingEntries.collect {
case (entryName, entryWriter) if context.entries.contains(entryName) =>
- output.reset()
- entryWriter.write(context, output)
-
val colferEntry = new ColferEntry()
- colferEntry.name = entryName
- colferEntry.content = output.toByteArray()
+ try {
+ output.reset()
+ entryWriter.write(context, output)
+
+ colferEntry.name = entryName
+ colferEntry.content = output.toByteArray()
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
+ }
+
colferEntry
}
contextData.entries = entries.toArray
}
- output.reset()
- // TODO: avoid internal allocation of byte[] on the marshal method. Use ReusableByteStreamWriter's underlying buffer.
- contextData.marshal(output, null)
- writer.write(output.toByteArray)
+ try {
+ val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
+ writer.write(contextOutgoingBuffer, 0, contextSize)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t)
+ }
}
}
}
@@ -246,6 +268,7 @@ object BinaryPropagation {
}
case class Settings(
+ maxOutgoingSize: Int,
incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]],
outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]]
)
@@ -269,6 +292,7 @@ object BinaryPropagation {
}
Settings(
+ config.getBytes("max-outgoing-size").toInt,
buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs),
buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs)
)