From c80a6a4f87828284421b7bea670829a424455f2e Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 28 Sep 2018 14:53:00 +0200 Subject: improve error handling on binary propagation --- .../kamon/context/BinaryPropagationSpec.scala | 78 +++++++++++++++- kamon-core/src/main/resources/reference.conf | 4 + .../scala/kamon/context/BinaryPropagation.scala | 100 +++++++++++++-------- .../main/scala/kamon/trace/SpanPropagation.scala | 1 - 4 files changed, 143 insertions(+), 40 deletions(-) diff --git a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala index 99e72f59..5681d300 100644 --- a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala @@ -8,6 +8,8 @@ import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} import kamon.context.Propagation.{EntryReader, EntryWriter} import org.scalatest.{Matchers, OptionValues, WordSpec} +import scala.util.Random + class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { "The Binary Context Propagation" should { @@ -22,6 +24,51 @@ class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { writer.size() shouldBe 0 } + "handle malformed data in when reading a context" in { + val randomBytes = Array.ofDim[Byte](42) + Random.nextBytes(randomBytes) + + val context = binaryPropagation.read(ByteStreamReader.of(randomBytes)) + context.isEmpty() shouldBe true + } + + "handle read failures in an entry reader" in { + val context = Context.of( + BinaryPropagationSpec.StringKey, "string-value", + BinaryPropagationSpec.FailStringKey, "fail-read" + ) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.tags shouldBe empty + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null + } + + "handle write failures in an entry writer" in { + val context = Context.of( + BinaryPropagationSpec.StringKey, "string-value", + BinaryPropagationSpec.FailStringKey, "fail-write" + ) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.tags shouldBe empty + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null + } + + "handle write failures in an entry writer when the context is too big" in { + val context = Context.of(BinaryPropagationSpec.StringKey, "string-value" * 20) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext shouldBe empty + } + "round trip a Context that only has tags" in { val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) val writer = inspectableByteStreamWriter() @@ -61,9 +108,11 @@ class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { val binaryPropagation = BinaryPropagation.from( ConfigFactory.parseString( """ - | + |max-outgoing-size = 64 |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + |entries.incoming.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec" |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + |entries.outgoing.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec" | """.stripMargin ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) @@ -76,6 +125,7 @@ class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { object BinaryPropagationSpec { val StringKey = Context.key[String]("string", null) + val FailStringKey = Context.key[String]("failString", null) val IntegerKey = Context.key[Int]("integer", 0) class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { @@ -95,4 +145,30 @@ object BinaryPropagationSpec { } } } + + class FailStringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { + + override def read(medium: ByteStreamReader, context: Context): Context = { + val valueData = medium.readAll() + + if(valueData.length > 0) { + val stringValue = new String(valueData) + if(stringValue == "fail-read") { + sys.error("The fail string entry reader has triggered") + } + + context.withKey(FailStringKey, stringValue) + } else context + } + + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val value = context.get(FailStringKey) + if(value != null && value != "fail-write") { + medium.write(value.getBytes) + } else { + medium.write(42) // malformed data on purpose + sys.error("The fail string entry writer has triggered") + } + } + } } \ No newline at end of file diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index eb374455..2108e302 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -211,6 +211,10 @@ kamon { # default { + # Maximum outgoing Context size for binary transports. Contexts that surpass this limit will not be written to + # the outgoing medium. + max-outgoing-size = 2048 + # Configure which entries should be read from incoming messages and writen to outgoing messages. # entries { diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala index 847ac452..75e65c44 100644 --- a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory import scala.reflect.ClassTag import scala.util.control.NonFatal -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} /** @@ -157,44 +157,58 @@ object BinaryPropagation { override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128) } + private val _contextBufferPool = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize) + } + override def read(reader: ByteStreamReader): Context = { if(reader.available() > 0) { - val contextData = new ColferContext() - contextData.unmarshal(reader.readAll(), 0) + val contextData = Try { + val cContext = new ColferContext() + cContext.unmarshal(reader.readAll(), 0) + cContext + } - // Context tags - var tagSectionsCount = contextData.tags.length - if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { - _log.warn("Malformed context tags found when trying to read a Context from ByteStreamReader") - tagSectionsCount -= 1 + contextData.failed.foreach { + case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t) } - val tags = if (tagSectionsCount > 0) { - val tagsBuilder = Map.newBuilder[String, String] - var tagIndex = 0 - while (tagIndex < tagSectionsCount) { - tagsBuilder += (contextData.tags(tagIndex) -> contextData.tags(tagIndex + 1)) - tagIndex += 2 + contextData.map { colferContext => + + // Context tags + var tagSectionsCount = colferContext.tags.length + if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) { + _log.warn("Malformed Context tags found, tags consistency might be compromised") + tagSectionsCount -= 1 } - tagsBuilder.result() - } else Map.empty[String, String] + val tags = if (tagSectionsCount > 0) { + val tagsBuilder = Map.newBuilder[String, String] + var tagIndex = 0 + while (tagIndex < tagSectionsCount) { + tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1)) + tagIndex += 2 + } + tagsBuilder.result() + } else Map.empty[String, String] - // Only reads the entries for which there is a registered reader - contextData.entries.foldLeft(Context.of(tags)) { - case (context, entryData) => - settings.incomingEntries.get(entryData.name).map { entryReader => - var contextWithEntry = context - try { - contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) - } catch { - case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) - } - contextWithEntry - }.getOrElse(context) - } + // Only reads the entries for which there is a registered reader + colferContext.entries.foldLeft(Context.of(tags)) { + case (context, entryData) => + settings.incomingEntries.get(entryData.name).map { entryReader => + var contextWithEntry = context + try { + contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context) + } catch { + case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t) + } + + contextWithEntry + }.getOrElse(context) + } + } getOrElse(Context.Empty) } else Context.Empty } @@ -202,6 +216,7 @@ object BinaryPropagation { if (context.nonEmpty()) { val contextData = new ColferContext() val output = _streamPool.get() + val contextOutgoingBuffer = _contextBufferPool.get() if (context.tags.nonEmpty) { val tags = Array.ofDim[String](context.tags.size * 2) @@ -219,22 +234,29 @@ object BinaryPropagation { if (context.entries.nonEmpty) { val entries = settings.outgoingEntries.collect { case (entryName, entryWriter) if context.entries.contains(entryName) => - output.reset() - entryWriter.write(context, output) - val colferEntry = new ColferEntry() - colferEntry.name = entryName - colferEntry.content = output.toByteArray() + try { + output.reset() + entryWriter.write(context, output) + + colferEntry.name = entryName + colferEntry.content = output.toByteArray() + } catch { + case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t) + } + colferEntry } contextData.entries = entries.toArray } - output.reset() - // TODO: avoid internal allocation of byte[] on the marshal method. Use ReusableByteStreamWriter's underlying buffer. - contextData.marshal(output, null) - writer.write(output.toByteArray) + try { + val contextSize = contextData.marshal(contextOutgoingBuffer, 0) + writer.write(contextOutgoingBuffer, 0, contextSize) + } catch { + case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t) + } } } } @@ -246,6 +268,7 @@ object BinaryPropagation { } case class Settings( + maxOutgoingSize: Int, incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]], outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]] ) @@ -269,6 +292,7 @@ object BinaryPropagation { } Settings( + config.getBytes("max-outgoing-size").toInt, buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs), buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs) ) diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala index b83a5ade..dc168347 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala @@ -122,7 +122,6 @@ object SpanPropagation { * */ class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] { - val emptyBuffer = ByteBuffer.allocate(0) override def read(medium: ByteStreamReader, context: Context): Context = { if(medium.available() == 0) -- cgit v1.2.3