aboutsummaryrefslogtreecommitdiff
path: root/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala
blob: 5681d300331f45bf940e23dfa7a789954e4d8643 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package kamon.context

import java.io.ByteArrayOutputStream

import com.typesafe.config.ConfigFactory
import kamon.Kamon
import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
import kamon.context.Propagation.{EntryReader, EntryWriter}
import org.scalatest.{Matchers, OptionValues, WordSpec}

import scala.util.Random

class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues {

  "The Binary Context Propagation" should {
    "return an empty context if there is no data to read from" in {
      val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0)))
      context.isEmpty() shouldBe true
    }

    "not write any data to the medium if the context is empty" in {
      val writer = inspectableByteStreamWriter()
      binaryPropagation.write(Context.Empty, writer)
      writer.size() shouldBe 0
    }

    "handle malformed data in when reading a context" in {
      val randomBytes = Array.ofDim[Byte](42)
      Random.nextBytes(randomBytes)

      val context = binaryPropagation.read(ByteStreamReader.of(randomBytes))
      context.isEmpty() shouldBe true
    }

    "handle read failures in an entry reader" in {
      val context = Context.of(
        BinaryPropagationSpec.StringKey, "string-value",
        BinaryPropagationSpec.FailStringKey, "fail-read"
      )
      val writer = inspectableByteStreamWriter()
      binaryPropagation.write(context, writer)

      val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
      rtContext.tags shouldBe empty
      rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
      rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
    }

    "handle write failures in an entry writer" in {
      val context = Context.of(
        BinaryPropagationSpec.StringKey, "string-value",
        BinaryPropagationSpec.FailStringKey, "fail-write"
      )
      val writer = inspectableByteStreamWriter()
      binaryPropagation.write(context, writer)

      val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
      rtContext.tags shouldBe empty
      rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
      rtContext.get(BinaryPropagationSpec.FailStringKey) shouldBe null
    }

    "handle write failures in an entry writer when the context is too big" in {
      val context = Context.of(BinaryPropagationSpec.StringKey, "string-value" * 20)
      val writer = inspectableByteStreamWriter()
      binaryPropagation.write(context, writer)

      val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
      rtContext shouldBe empty
    }

    "round trip a Context that only has tags" in {
      val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
      val writer = inspectableByteStreamWriter()
      binaryPropagation.write(context, writer)

      val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
      rtContext.entries shouldBe empty
      rtContext.tags should contain theSameElementsAs (context.tags)
    }

    "round trip a Context that only has entries" in {
      val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42)
      val writer = inspectableByteStreamWriter()
      binaryPropagation.write(context, writer)

      val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))
      rtContext.tags shouldBe empty
      rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
      rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
    }

    "round trip a Context that with tags and entries" in {
      val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez"))
        .withKey(BinaryPropagationSpec.StringKey, "string-value")
        .withKey(BinaryPropagationSpec.IntegerKey, 42)

      val writer = inspectableByteStreamWriter()
      binaryPropagation.write(context, writer)
      val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray))

      rtContext.tags should contain theSameElementsAs (context.tags)
      rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value"
      rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key
    }
  }

  val binaryPropagation = BinaryPropagation.from(
    ConfigFactory.parseString(
      """
        |max-outgoing-size = 64
        |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
        |entries.incoming.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
        |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec"
        |entries.outgoing.failString = "kamon.context.BinaryPropagationSpec$FailStringEntryCodec"
        |
      """.stripMargin
    ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon)


  def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter

}

object BinaryPropagationSpec {

  val StringKey = Context.key[String]("string", null)
  val FailStringKey = Context.key[String]("failString", null)
  val IntegerKey = Context.key[Int]("integer", 0)

  class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {

    override def read(medium: ByteStreamReader, context: Context): Context = {
      val valueData = medium.readAll()

      if(valueData.length > 0) {
        context.withKey(StringKey, new String(valueData))
      } else context
    }

    override def write(context: Context, medium: ByteStreamWriter): Unit = {
      val value = context.get(StringKey)
      if(value != null) {
        medium.write(value.getBytes)
      }
    }
  }

  class FailStringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] {

    override def read(medium: ByteStreamReader, context: Context): Context = {
      val valueData = medium.readAll()

      if(valueData.length > 0) {
        val stringValue = new String(valueData)
        if(stringValue == "fail-read") {
          sys.error("The fail string entry reader has triggered")
        }

        context.withKey(FailStringKey, stringValue)
      } else context
    }

    override def write(context: Context, medium: ByteStreamWriter): Unit = {
      val value = context.get(FailStringKey)
      if(value != null && value != "fail-write") {
        medium.write(value.getBytes)
      } else {
        medium.write(42) // malformed data on purpose
        sys.error("The fail string entry writer has triggered")
      }
    }
  }
}