package kamon.context import java.io.ByteArrayOutputStream import com.typesafe.config.ConfigFactory import kamon.Kamon import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} import kamon.context.Propagation.{EntryReader, EntryWriter} import org.scalatest.{Matchers, OptionValues, WordSpec} import scala.util.Random class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { "The Binary Context Propagation" should { "return an empty context if there is no data to read from" in { val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0))) context.isEmpty() shouldBe true } "not write any data to the medium if the context is empty" in { val writer = inspectableByteStreamWriter() binaryPropagation.write(Context.Empty, writer) writer.size() shouldBe 0 } "handle malformed data in when reading a context" in { val randomBytes = Array.ofDim[Byte](42) Random.nextBytes(randomBytes) val context = binaryPropagation.read(ByteStreamReader.of(randomBytes)) context.isEmpty() shouldBe true } "handle read failures in an entry reader" in { val context = Context.of( BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.FailStringKey, "fail-read" ) val writer = inspectableByteStreamWriter() binaryPropagation.write(context, writer) val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) rtContext.tags shouldBe empty rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null } "handle write failures in an entry writer" in { val context = Context.of( BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.FailStringKey, "fail-write" ) val writer = inspectableByteStreamWriter() binaryPropagation.write(context, writer) val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) rtContext.tags shouldBe empty rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null } "handle write failures in an entry writer when the context is too big" in { val context = Context.of(BinaryPropagationSpec.StringKey, "string-value" * 20) val writer = inspectableByteStreamWriter() binaryPropagation.write(context, writer) val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) rtContext shouldBe empty } "round trip a Context that only has tags" in { val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) val writer = inspectableByteStreamWriter() binaryPropagation.write(context, writer) val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) rtContext.entries shouldBe empty rtContext.tags should contain theSameElementsAs (context.tags) } "round trip a Context that only has entries" in { val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42) val writer = inspectableByteStreamWriter() binaryPropagation.write(context, writer) val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) rtContext.tags shouldBe empty rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key } "round trip a Context that with tags and entries" in { val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) .withKey(BinaryPropagationSpec.StringKey, "string-value") .withKey(BinaryPropagationSpec.IntegerKey, 42) val writer = inspectableByteStreamWriter() binaryPropagation.write(context, writer) val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) rtContext.tags should contain theSameElementsAs (context.tags) rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key } } val binaryPropagation = BinaryPropagation.from( ConfigFactory.parseString( """ |max-outgoing-size = 64 |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" |entries.incoming.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec" |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" |entries.outgoing.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec" | """.stripMargin ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter } object BinaryPropagationSpec { val StringKey = Context.key[String]("string", null) val FailStringKey = Context.key[String]("failString", null) val IntegerKey = Context.key[Int]("integer", 0) class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { override def read(medium: ByteStreamReader, context: Context): Context = { val valueData = medium.readAll() if(valueData.length > 0) { context.withKey(StringKey, new String(valueData)) } else context } override def write(context: Context, medium: ByteStreamWriter): Unit = { val value = context.get(StringKey) if(value != null) { medium.write(value.getBytes) } } } class FailStringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { override def read(medium: ByteStreamReader, context: Context): Context = { val valueData = medium.readAll() if(valueData.length > 0) { val stringValue = new String(valueData) if(stringValue == "fail-read") { sys.error("The fail string entry reader has triggered") } context.withKey(FailStringKey, stringValue) } else context } override def write(context: Context, medium: ByteStreamWriter): Unit = { val value = context.get(FailStringKey) if(value != null && value != "fail-write") { medium.write(value.getBytes) } else { medium.write(42) // malformed data on purpose sys.error("The fail string entry writer has triggered") } } } }