From b61c92ea3589450fd097ab79420230b61b458ae4 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 28 Sep 2018 12:08:24 +0200 Subject: cleanup HTTP propagation, introduce a new Binary propagation --- .../src/test/scala/kamon/KamonLifecycleSpec.scala | 2 +- .../kamon/context/BinaryPropagationSpec.scala | 98 ++++++++++++++++++++++ .../scala/kamon/context/ContextCodecSpec.scala | 93 -------------------- .../kamon/context/ContextSerializationSpec.scala | 50 +++++------ .../scala/kamon/context/HttpPropagationSpec.scala | 53 +++++------- .../HttpServerInstrumentationSpec.scala | 5 +- .../test/scala/kamon/trace/B3SpanCodecSpec.scala | 2 +- 7 files changed, 151 insertions(+), 152 deletions(-) create mode 100644 kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala delete mode 100644 kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala (limited to 'kamon-core-tests') diff --git a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala index b5d0425b..ee80fd2b 100644 --- a/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/KamonLifecycleSpec.scala @@ -12,7 +12,7 @@ import org.scalatest.time.SpanSugar._ class KamonLifecycleSpec extends WordSpec with Matchers with Eventually{ - "the Kamon lifecycle" should { + "the Kamon lifecycle" ignore { "keep the JVM running if reporters are running" in { val process = Runtime.getRuntime.exec(createProcessCommand("kamon.KamonWithRunningReporter")) Thread.sleep(5000) diff --git a/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala new file mode 100644 index 00000000..99e72f59 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/context/BinaryPropagationSpec.scala @@ -0,0 +1,98 @@ +package kamon.context + +import java.io.ByteArrayOutputStream + +import com.typesafe.config.ConfigFactory +import kamon.Kamon +import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter} +import kamon.context.Propagation.{EntryReader, EntryWriter} +import org.scalatest.{Matchers, OptionValues, WordSpec} + +class BinaryPropagationSpec extends WordSpec with Matchers with OptionValues { + + "The Binary Context Propagation" should { + "return an empty context if there is no data to read from" in { + val context = binaryPropagation.read(ByteStreamReader.of(Array.ofDim[Byte](0))) + context.isEmpty() shouldBe true + } + + "not write any data to the medium if the context is empty" in { + val writer = inspectableByteStreamWriter() + binaryPropagation.write(Context.Empty, writer) + writer.size() shouldBe 0 + } + + "round trip a Context that only has tags" in { + val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.entries shouldBe empty + rtContext.tags should contain theSameElementsAs (context.tags) + } + + "round trip a Context that only has entries" in { + val context = Context.of(BinaryPropagationSpec.StringKey, "string-value", BinaryPropagationSpec.IntegerKey, 42) + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + rtContext.tags shouldBe empty + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key + } + + "round trip a Context that with tags and entries" in { + val context = Context.of(Map("hello" -> "world", "kamon" -> "rulez")) + .withKey(BinaryPropagationSpec.StringKey, "string-value") + .withKey(BinaryPropagationSpec.IntegerKey, 42) + + val writer = inspectableByteStreamWriter() + binaryPropagation.write(context, writer) + val rtContext = binaryPropagation.read(ByteStreamReader.of(writer.toByteArray)) + + rtContext.tags should contain theSameElementsAs (context.tags) + rtContext.get(BinaryPropagationSpec.StringKey) shouldBe "string-value" + rtContext.get(BinaryPropagationSpec.IntegerKey) shouldBe 0 // there is no entry configuration for the integer key + } + } + + val binaryPropagation = BinaryPropagation.from( + ConfigFactory.parseString( + """ + | + |entries.incoming.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + |entries.outgoing.string = "kamon.context.BinaryPropagationSpec$StringEntryCodec" + | + """.stripMargin + ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) + + + def inspectableByteStreamWriter() = new ByteArrayOutputStream(32) with ByteStreamWriter + +} + +object BinaryPropagationSpec { + + val StringKey = Context.key[String]("string", null) + val IntegerKey = Context.key[Int]("integer", 0) + + class StringEntryCodec extends EntryReader[ByteStreamReader] with EntryWriter[ByteStreamWriter] { + + override def read(medium: ByteStreamReader, context: Context): Context = { + val valueData = medium.readAll() + + if(valueData.length > 0) { + context.withKey(StringKey, new String(valueData)) + } else context + } + + override def write(context: Context, medium: ByteStreamWriter): Unit = { + val value = context.get(StringKey) + if(value != null) { + medium.write(value.getBytes) + } + } + } +} \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala deleted file mode 100644 index 6a43bd01..00000000 --- a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import kamon.Kamon -import kamon.testkit.ContextTesting -import org.scalatest.{Matchers, OptionValues, WordSpec} - -class ContextCodecSpec extends WordSpec with Matchers with ContextTesting with OptionValues { - "the Context Codec" ignore { - "encoding/decoding to HttpHeaders" should { - "round trip a empty context" in { - val textMap = ContextCodec.HttpHeaders.encode(Context.Empty) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with only local keys" in { - val localOnlyContext = Context.of(StringKey, Some("string-value")) - val textMap = ContextCodec.HttpHeaders.encode(localOnlyContext) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with local and broadcast keys" in { - val initialContext = Context.Empty - .withKey(StringKey, Some("string-value")) - .withKey(StringBroadcastKey, Some("this-should-be-round-tripped")) - - val textMap = ContextCodec.HttpHeaders.encode(initialContext) - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - decodedContext.get(StringKey) shouldBe empty - decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped" - } - - "read string broadcast keys using the configured header name" in { - val textMap = TextMap.Default() - textMap.put("X-Request-ID", "123456") - val decodedContext = ContextCodec.HttpHeaders.decode(textMap) - - //decodedContext.get(Key.broadcastString("request-id")).value shouldBe "123456" - } - } - - "encoding/decoding to Binary" should { - "round trip a empty context" in { - val byteBuffer = ContextCodec.Binary.encode(Context.Empty) - - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with only local keys" in { - val localOnlyContext = Context.of(StringKey, Some("string-value")) - val byteBuffer = ContextCodec.Binary.encode(localOnlyContext) - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext shouldBe Context.Empty - } - - "round trip a context with local and broadcast keys" in { - val initialContext = Context.Empty - .withKey(StringKey, Some("string-value")) - .withKey(StringBroadcastKey, Some("this-should-be-round-tripped")) - - val byteBuffer = ContextCodec.Binary.encode(initialContext) - val decodedContext = ContextCodec.Binary.decode(byteBuffer) - - decodedContext.get(StringKey) shouldBe empty - decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped" - } - } - } - - val ContextCodec = new Codecs(Kamon.config()) -} \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala index 22400fa9..7ffb0838 100644 --- a/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/context/ContextSerializationSpec.scala @@ -22,29 +22,29 @@ import kamon.testkit.ContextTesting import org.scalatest.{Matchers, OptionValues, WordSpec} class ContextSerializationSpec extends WordSpec with Matchers with ContextTesting with OptionValues { - "the Context is Serializable" ignore { - "empty " in { - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(Context.Empty) - - val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) - val ctx = ois.readObject().asInstanceOf[Context] - ctx shouldBe Context.Empty - } - - "full" in { - val sCtx = Context.of(StringBroadcastKey, Some("disi")) - val bos = new ByteArrayOutputStream() - val oos = new ObjectOutputStream(bos) - oos.writeObject(sCtx) - - val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) - val rCtx = ois.readObject().asInstanceOf[Context] - rCtx shouldBe sCtx - } - - } - - val ContextCodec = new Codecs(Kamon.config()) +// "the Context is Serializable" ignore { +// "empty " in { +// val bos = new ByteArrayOutputStream() +// val oos = new ObjectOutputStream(bos) +// oos.writeObject(Context.Empty) +// +// val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) +// val ctx = ois.readObject().asInstanceOf[Context] +// ctx shouldBe Context.Empty +// } +// +// "full" in { +// val sCtx = Context.of(StringBroadcastKey, Some("disi")) +// val bos = new ByteArrayOutputStream() +// val oos = new ObjectOutputStream(bos) +// oos.writeObject(sCtx) +// +// val ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray)) +// val rCtx = ois.readObject().asInstanceOf[Context] +// rCtx shouldBe sCtx +// } +// +// } +// +// val ContextCodec = new Codecs(Kamon.config()) } \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala index 44165b98..ac1250ea 100644 --- a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala @@ -2,7 +2,8 @@ package kamon.context import com.typesafe.config.ConfigFactory import kamon.Kamon -import kamon.context.HttpPropagation.Direction +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} +import kamon.context.Propagation.{EntryReader, EntryWriter} import org.scalatest.{Matchers, OptionValues, WordSpec} import scala.collection.mutable @@ -12,9 +13,8 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "The HTTP Context Propagation" when { "reading from incoming requests" should { "return an empty context if there are no tags nor keys" in { - val context = httpPropagation.readContext(headerReaderFromMap(Map.empty)) - context.tags shouldBe empty - context.entries shouldBe empty + val context = httpPropagation.read(headerReaderFromMap(Map.empty)) + context.isEmpty() shouldBe true } "read tags from an HTTP message when they are available" in { @@ -22,7 +22,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "x-content-tags" -> "hello=world;correlation=1234", "x-mapped-tag" -> "value" ) - val context = httpPropagation.readContext(headerReaderFromMap(headers)) + val context = httpPropagation.read(headerReaderFromMap(headers)) context.tags should contain only( "hello" -> "world", "correlation" -> "1234", @@ -32,7 +32,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "handle errors when reading HTTP headers" in { val headers = Map("fail" -> "") - val context = httpPropagation.readContext(headerReaderFromMap(headers)) + val context = httpPropagation.read(headerReaderFromMap(headers)) context.tags shouldBe empty context.entries shouldBe empty } @@ -44,7 +44,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "integer-header" -> "123" ) - val context = httpPropagation.readContext(headerReaderFromMap(headers)) + val context = httpPropagation.read(headerReaderFromMap(headers)) context.get(HttpPropagationSpec.StringKey) shouldBe "hey" context.get(HttpPropagationSpec.IntegerKey) shouldBe 123 context.get(HttpPropagationSpec.OptionalKey) shouldBe empty @@ -56,17 +56,9 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "writing to outgoing requests" should { - propagationWritingTests(Direction.Outgoing) - } - - "writing to returning requests" should { - propagationWritingTests(Direction.Returning) - } - - def propagationWritingTests(direction: Direction.Write) = { "not write anything if the context is empty" in { val headers = mutable.Map.empty[String, String] - httpPropagation.writeContext(Context.Empty, headerWriterFromMap(headers), direction) + httpPropagation.write(Context.Empty, headerWriterFromMap(headers)) headers shouldBe empty } @@ -77,7 +69,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "mappedTag" -> "value" )) - httpPropagation.writeContext(context, headerWriterFromMap(headers), direction) + httpPropagation.write(context, headerWriterFromMap(headers)) headers should contain only( "x-content-tags" -> "hello=world;", "x-mapped-tag" -> "value" @@ -91,10 +83,10 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { HttpPropagationSpec.IntegerKey, 42, ) - httpPropagation.writeContext(context, headerWriterFromMap(headers), direction) + httpPropagation.write(context, headerWriterFromMap(headers)) headers should contain only( "string-header" -> "out-we-go" - ) + ) } } } @@ -116,23 +108,24 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { |entries.incoming.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" |entries.incoming.integer = "kamon.context.HttpPropagationSpec$IntegerEntryCodec" |entries.outgoing.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" - |entries.returning.string = "kamon.context.HttpPropagationSpec$StringEntryCodec" | """.stripMargin ).withFallback(ConfigFactory.load().getConfig("kamon.propagation")), Kamon) def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { - override def readHeader(header: String): Option[String] = { + override def read(header: String): Option[String] = { if(map.get("fail").nonEmpty) sys.error("failing on purpose") map.get(header) } + + override def readAll(): Map[String, String] = map } def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { - override def writeHeader(header: String, value: String): Unit = map.put(header, value) + override def write(header: String, value: String): Unit = map.put(header, value) } } @@ -143,23 +136,23 @@ object HttpPropagationSpec { val OptionalKey = Context.key[Option[String]]("optional", None) - class StringEntryCodec extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { + class StringEntryCodec extends EntryReader[HeaderReader] with EntryWriter[HeaderWriter] { private val HeaderName = "string-header" - override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { - reader.readHeader(HeaderName) + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.read(HeaderName) .map(v => context.withKey(StringKey, v)) .getOrElse(context) } - override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { - Option(context.get(StringKey)).foreach(v => writer.writeHeader(HeaderName, v)) + override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = { + Option(context.get(StringKey)).foreach(v => writer.write(HeaderName, v)) } } - class IntegerEntryCodec extends HttpPropagation.EntryReader { - override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { - reader.readHeader("integer-header") + class IntegerEntryCodec extends EntryReader[HeaderReader] { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.read("integer-header") .map(v => context.withKey(IntegerKey, v.toInt)) .getOrElse(context) diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala index 19e08666..d334184d 100644 --- a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala @@ -272,13 +272,14 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp override def url: String = requestUrl override def path: String = requestPath override def method: String = requestMethod - override def readHeader(header: String): Option[String] = headers.get(header) + override def read(header: String): Option[String] = headers.get(header) + override def readAll(): Map[String, String] = headers } def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpResponse.Writable[HttpResponse] = new HttpResponse.Writable[HttpResponse] { override def statusCode: Int = responseStatusCode - override def writeHeader(header: String, value: String): Unit = headers.put(header, value) + override def write(header: String, value: String): Unit = headers.put(header, value) override def build(): HttpResponse = this } diff --git a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala index 76e07c31..37a95a68 100644 --- a/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/trace/B3SpanCodecSpec.scala @@ -16,7 +16,7 @@ package kamon.trace -import kamon.context.{Context, TextMap} +import kamon.context.{Context} import kamon.testkit.SpanBuilding import kamon.trace.IdentityProvider.Identifier import kamon.trace.SpanContext.SamplingDecision -- cgit v1.2.3