diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2018-09-28 14:53:00 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2018-09-28 14:53:00 +0200 |
commit | c80a6a4f87828284421b7bea670829a424455f2e (patch) | |
tree | 8808509c6cdc241569db3ae34d7f9d721bed2ad4 /kamon-core/src/main/scala | |
parent | ce1424715f91beda67fd5f4da705d9b096147ca0 (diff) | |
download | Kamon-c80a6a4f87828284421b7bea670829a424455f2e.tar.gz Kamon-c80a6a4f87828284421b7bea670829a424455f2e.tar.bz2 Kamon-c80a6a4f87828284421b7bea670829a424455f2e.zip |
improve error handling on binary propagation
Diffstat (limited to 'kamon-core/src/main/scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala | 100 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala | 1 |
2 files changed, 62 insertions, 39 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala index 847ac452..75e65c44 100644 --- a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory import scala.reflect.ClassTag import scala.util.control.NonFatal -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} /** @@ -157,44 +157,58 @@ object BinaryPropagation { override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128) } + private val _contextBufferPool = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize) + } + override def read(reader: ByteStreamReader): Context = { if(reader.available() > 0) { - val contextData = new ColferContext() - contextData.unmarshal(reader.readAll(), 0) + val contextData = Try { + val cContext = new ColferContext() + cContext.unmarshal(reader.readAll(), 0) + cContext + } - // Context tags - var tagSectionsCount = contextData.tags.length - if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { - _log.warn("Malformed context tags found when trying to read a Context from ByteStreamReader") - tagSectionsCount -= 1 + contextData.failed.foreach { + case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t) } - val tags = if (tagSectionsCount > 0) { - val tagsBuilder = Map.newBuilder[String, String] - var tagIndex = 0 - while (tagIndex < tagSectionsCount) { - tagsBuilder += (contextData.tags(tagIndex) -> contextData.tags(tagIndex + 1)) - tagIndex += 2 + contextData.map { colferContext => + + // Context tags + var tagSectionsCount = colferContext.tags.length + if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { + _log.warn("Malformed Context tags found, tags consistency might be compromised") + tagSectionsCount -= 1 } - tagsBuilder.result() - } else Map.empty[String, String] + val tags = if (tagSectionsCount > 0) { + val tagsBuilder = Map.newBuilder[String, String] + var tagIndex = 0 + while (tagIndex < tagSectionsCount) { + tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1)) + tagIndex += 2 + } + tagsBuilder.result() + } else Map.empty[String, String] - // Only reads the entries for which there is a registered reader - contextData.entries.foldLeft(Context.of(tags)) { - case (context, entryData) => - settings.incomingEntries.get(entryData.name).map { entryReader => - var contextWithEntry = context - try { - contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) - } catch { - case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) - } - contextWithEntry - }.getOrElse(context) - } + // Only reads the entries for which there is a registered reader + colferContext.entries.foldLeft(Context.of(tags)) { + case (context, entryData) => + settings.incomingEntries.get(entryData.name).map { entryReader => + var contextWithEntry = context + try { + contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) + } catch { + case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) + } + + contextWithEntry + }.getOrElse(context) + } + } getOrElse(Context.Empty) } else Context.Empty } @@ -202,6 +216,7 @@ object BinaryPropagation { if (context.nonEmpty()) { val contextData = new ColferContext() val output = _streamPool.get() + val contextOutgoingBuffer = _contextBufferPool.get() if (context.tags.nonEmpty) { val tags = Array.ofDim[String](context.tags.size * 2) @@ -219,22 +234,29 @@ object BinaryPropagation { if (context.entries.nonEmpty) { val entries = settings.outgoingEntries.collect { case (entryName, entryWriter) if context.entries.contains(entryName) => - output.reset() - entryWriter.write(context, output) - val colferEntry = new ColferEntry() - colferEntry.name = entryName - colferEntry.content = output.toByteArray() + try { + output.reset() + entryWriter.write(context, output) + + colferEntry.name = entryName + colferEntry.content = output.toByteArray() + } catch { + case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t) + } + colferEntry } contextData.entries = entries.toArray } - output.reset() - // TODO: avoid internal allocation of byte[] on the marshal method. Use ReusableByteStreamWriter's underlying buffer. - contextData.marshal(output, null) - writer.write(output.toByteArray) + try { + val contextSize = contextData.marshal(contextOutgoingBuffer, 0) + writer.write(contextOutgoingBuffer, 0, contextSize) + } catch { + case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t) + } } } } @@ -246,6 +268,7 @@ object BinaryPropagation { } case class Settings( + maxOutgoingSize: Int, incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]], outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]] ) @@ -269,6 +292,7 @@ object BinaryPropagation { } Settings( + config.getBytes("max-outgoing-size").toInt, buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs), buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs) ) diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index b83a5ade..dc168347 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -122,7 +122,6 @@ object SpanPropagation { * */ class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { - val emptyBuffer = ByteBuffer.allocate(0) override def read(medium: ByteStreamReader, context: Context): Context = { if(medium.available() == 0) |