aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-09-28 14:53:00 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2018-09-28 14:53:00 +0200
commitc80a6a4f87828284421b7bea670829a424455f2e (patch)
tree8808509c6cdc241569db3ae34d7f9d721bed2ad4
parentce1424715f91beda67fd5f4da705d9b096147ca0 (diff)
downloadKamon-c80a6a4f87828284421b7bea670829a424455f2e.tar.gz
Kamon-c80a6a4f87828284421b7bea670829a424455f2e.tar.bz2
Kamon-c80a6a4f87828284421b7bea670829a424455f2e.zip
improve error handling on binary propagation
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala78
-rw-r--r--kamon-core/src/main/resources/reference.conf4
-rw-r--r--kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala100
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala1
4 files changed, 143 insertions, 40 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
index 99e72f59..5681d300 100644
--- a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
@@ -8,6 +8,8 @@ import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
import kamon.context.Propagation.{EntryReader, EntryWriter}
import org.scalatest.{Matchers, OptionValues, WordSpec}
+import scala.util.Random
+
class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {
"The Binary Context Propagation" should {
@@ -22,6 +24,51 @@ class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {
writer.size() shouldBe 0
}
+ "handle malformed data in when reading a context" in {
+ val randomBytes = Array.ofDim[Byte](42)
+ Random.nextBytes(randomBytes)
+
+ val context = binaryPropagation.read(ByteStreamReader.of(randomBytes))
+ context.isEmpty() shouldBe true
+ }
+
+ "handle read failures in an entry reader" in {
+ val context = Context.of(
+ BinaryPropagationSpec.StringKey, "string-value",
+ BinaryPropagationSpec.FailStringKey, "fail-read"
+ )
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.tags shouldBe empty
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
+ }
+
+ "handle write failures in an entry writer" in {
+ val context = Context.of(
+ BinaryPropagationSpec.StringKey, "string-value",
+ BinaryPropagationSpec.FailStringKey, "fail-write"
+ )
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext.tags shouldBe empty
+ rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
+ rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
+ }
+
+ "handle write failures in an entry writer when the context is too big" in {
+ val context = Context.of(BinaryPropagationSpec.StringKey, "string-value" * 20)
+ val writer = inspectableByteStreamWriter()
+ binaryPropagation.write(context, writer)
+
+ val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
+ rtContext shouldBe empty
+ }
+
"round trip a Context that only has tags" in {
val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
val writer = inspectableByteStreamWriter()
@@ -61,9 +108,11 @@ class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {
val binaryPropagation = BinaryPropagation.from(
ConfigFactory.parseString(
"""
- |
+ |max-outgoing-size = 64
|entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
+ |entries.incoming.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
|entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
+ |entries.outgoing.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
|
""".stripMargin
).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon)
@@ -76,6 +125,7 @@ class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {
object BinaryPropagationSpec {
val StringKey = Context.key[String]("string", null)
+ val FailStringKey = Context.key[String]("failString", null)
val IntegerKey = Context.key[Int]("integer", 0)
class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {
@@ -95,4 +145,30 @@ object BinaryPropagationSpec {
}
}
}
+
+ class FailStringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {
+
+ override def read(medium: ByteStreamReader, context: Context): Context = {
+ val valueData = medium.readAll()
+
+ if(valueData.length > 0) {
+ val stringValue = new String(valueData)
+ if(stringValue == "fail-read") {
+ sys.error("The fail string entry reader has triggered")
+ }
+
+ context.withKey(FailStringKey, stringValue)
+ } else context
+ }
+
+ override def write(context: Context, medium: ByteStreamWriter): Unit = {
+ val value = context.get(FailStringKey)
+ if(value != null && value != "fail-write") {
+ medium.write(value.getBytes)
+ } else {
+ medium.write(42) // malformed data on purpose
+ sys.error("The fail string entry writer has triggered")
+ }
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index eb374455..2108e302 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -211,6 +211,10 @@ kamon {
#
default {
+ # Maximum outgoing Context size for binary transports. Contexts that surpass this limit will not be written to
+ # the outgoing medium.
+ max-outgoing-size = 2048
+
# Configure which entries should be read from incoming messages and writen to outgoing messages.
#
entries {
diff --git a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
index 847ac452..75e65c44 100644
--- a/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/BinaryPropagation.scala
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
/**
@@ -157,44 +157,58 @@ object BinaryPropagation {
override def initialValue(): Default.ReusableByteStreamWriter = new Default.ReusableByteStreamWriter(128)
}
+ private val _contextBufferPool = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](settings.maxOutgoingSize)
+ }
+
override def read(reader: ByteStreamReader): Context = {
if(reader.available() > 0) {
- val contextData = new ColferContext()
- contextData.unmarshal(reader.readAll(), 0)
+ val contextData = Try {
+ val cContext = new ColferContext()
+ cContext.unmarshal(reader.readAll(), 0)
+ cContext
+ }
- // Context tags
- var tagSectionsCount = contextData.tags.length
- if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
- _log.warn("Malformed context tags found when trying to read a Context from ByteStreamReader")
- tagSectionsCount -= 1
+ contextData.failed.foreach {
+ case NonFatal(t) => _log.warn("Failed to read Context from ByteStreamReader", t)
}
- val tags = if (tagSectionsCount > 0) {
- val tagsBuilder = Map.newBuilder[String, String]
- var tagIndex = 0
- while (tagIndex < tagSectionsCount) {
- tagsBuilder += (contextData.tags(tagIndex) -> contextData.tags(tagIndex + 1))
- tagIndex += 2
+ contextData.map { colferContext =>
+
+ // Context tags
+ var tagSectionsCount = colferContext.tags.length
+ if (tagSectionsCount > 0 && tagSectionsCount % 2 != 0) {
+ _log.warn("Malformed Context tags found, tags consistency might be compromised")
+ tagSectionsCount -= 1
}
- tagsBuilder.result()
- } else Map.empty[String, String]
+ val tags = if (tagSectionsCount > 0) {
+ val tagsBuilder = Map.newBuilder[String, String]
+ var tagIndex = 0
+ while (tagIndex < tagSectionsCount) {
+ tagsBuilder += (colferContext.tags(tagIndex) -> colferContext.tags(tagIndex + 1))
+ tagIndex += 2
+ }
+ tagsBuilder.result()
+ } else Map.empty[String, String]
- // Only reads the entries for which there is a registered reader
- contextData.entries.foldLeft(Context.of(tags)) {
- case (context, entryData) =>
- settings.incomingEntries.get(entryData.name).map { entryReader =>
- var contextWithEntry = context
- try {
- contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
- } catch {
- case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
- }
- contextWithEntry
- }.getOrElse(context)
- }
+ // Only reads the entries for which there is a registered reader
+ colferContext.entries.foldLeft(Context.of(tags)) {
+ case (context, entryData) =>
+ settings.incomingEntries.get(entryData.name).map { entryReader =>
+ var contextWithEntry = context
+ try {
+ contextWithEntry = entryReader.read(ByteStreamReader.of(entryData.content), context)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to read entry [{}]", entryData.name.asInstanceOf[Any], t)
+ }
+
+ contextWithEntry
+ }.getOrElse(context)
+ }
+ } getOrElse(Context.Empty)
} else Context.Empty
}
@@ -202,6 +216,7 @@ object BinaryPropagation {
if (context.nonEmpty()) {
val contextData = new ColferContext()
val output = _streamPool.get()
+ val contextOutgoingBuffer = _contextBufferPool.get()
if (context.tags.nonEmpty) {
val tags = Array.ofDim[String](context.tags.size * 2)
@@ -219,22 +234,29 @@ object BinaryPropagation {
if (context.entries.nonEmpty) {
val entries = settings.outgoingEntries.collect {
case (entryName, entryWriter) if context.entries.contains(entryName) =>
- output.reset()
- entryWriter.write(context, output)
-
val colferEntry = new ColferEntry()
- colferEntry.name = entryName
- colferEntry.content = output.toByteArray()
+ try {
+ output.reset()
+ entryWriter.write(context, output)
+
+ colferEntry.name = entryName
+ colferEntry.content = output.toByteArray()
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write entry [{}]", entryName.asInstanceOf[Any], t)
+ }
+
colferEntry
}
contextData.entries = entries.toArray
}
- output.reset()
- // TODO: avoid internal allocation of byte[] on the marshal method. Use ReusableByteStreamWriter's underlying buffer.
- contextData.marshal(output, null)
- writer.write(output.toByteArray)
+ try {
+ val contextSize = contextData.marshal(contextOutgoingBuffer, 0)
+ writer.write(contextOutgoingBuffer, 0, contextSize)
+ } catch {
+ case NonFatal(t) => _log.warn("Failed to write Context to ByteStreamWriter", t)
+ }
}
}
}
@@ -246,6 +268,7 @@ object BinaryPropagation {
}
case class Settings(
+ maxOutgoingSize: Int,
incomingEntries: Map[String, Propagation.EntryReader[ByteStreamReader]],
outgoingEntries: Map[String, Propagation.EntryWriter[ByteStreamWriter]]
)
@@ -269,6 +292,7 @@ object BinaryPropagation {
}
Settings(
+ config.getBytes("max-outgoing-size").toInt,
buildInstances[Propagation.EntryReader[ByteStreamReader]](config.getConfig("entries.incoming").pairs),
buildInstances[Propagation.EntryWriter[ByteStreamWriter]](config.getConfig("entries.outgoing").pairs)
)
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
index b83a5ade..dc168347 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
@@ -122,7 +122,6 @@ object SpanPropagation {
*
*/
class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] {
- val emptyBuffer = ByteBuffer.allocate(0)
override def read(medium: ByteStreamReader, context: Context): Context = {
if(medium.available() == 0)