From c487c51a54e67944c80cf2aecc63ac8158bf99a6 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 3 Sep 2018 15:37:14 +0200 Subject: wip on the HttpServer instrumentation --- kamon-core-tests/src/test/resources/reference.conf | 119 +++++++++++++ .../scala/kamon/context/HttpPropagationSpec.scala | 30 ++-- .../HttpServerInstrumentationSpec.scala | 88 ++++++++++ kamon-core/src/main/resources/reference.conf | 119 +++++++------ .../src/main/scala/kamon/context/Context.scala | 2 +- .../main/scala/kamon/context/HttpPropagation.scala | 28 +-- .../scala/kamon/instrumentation/HttpMessage.scala | 27 +++ .../scala/kamon/instrumentation/HttpServer.scala | 192 +++++++++++++++++++++ .../src/main/scala/kamon/trace/SpanCodec.scala | 22 +-- .../main/scala/kamon/testkit/ContextTesting.scala | 3 + .../main/scala/kamon/testkit/SpanInspection.scala | 13 +- 11 files changed, 552 insertions(+), 91 deletions(-) create mode 100644 kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala diff --git a/kamon-core-tests/src/test/resources/reference.conf b/kamon-core-tests/src/test/resources/reference.conf index 0d7ae9e2..d165249e 100644 --- a/kamon-core-tests/src/test/resources/reference.conf +++ b/kamon-core-tests/src/test/resources/reference.conf @@ -2,4 +2,123 @@ kamon { context.codecs.string-keys { request-id = "X-Request-ID" } +} + + + +kamon { + instrumentation { + http-server { + noop { + + # + # Configuration for HTTP context propagation + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + enabled = no + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + } + + + # + # Configuration for HTTP server metrics collection + # + metrics { + + # Enables collection of HTTP server metrics + enabled = no + + # Tags to include on the HTTP server metrics. The available options are: + # - method: HTTP method from the request. + # - status-code: HTTP status code from the responses. + # + tags = [ + "method", + "status-code" + ] + } + + + # + # Configuration for HTTP request tracing + # + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests + # and finish them when the response is sent back to the clients. + enabled = no + + # Select a context tag that provides a custom trace identifier. The custom trace identifier will be used + # only if all these conditions are met: + # - the context tag is present. + # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). + # - the identifier is valued in accordance to the identity provider. + trace-id-tag = "none" + + # Enables collection of span metrics using the `span.processing-time` metric. + span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + url = span + + # Use the http.method tag. + method = metric + + # Use the http.status_code tag. + status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. + from-context { + + # The peer tag identifiest the service that is calling the current service. It is added by default with + # the HttpClient instrumentation. + peer = metric + } + } + + # Custom mappings between routes and operation names. + operations { + + # Operation name for Spans created on requests that could not be handled by any route in the current + # application. + unhandled = "unhandled" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } + } + } + } + } + } } \ No newline at end of file diff --git a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala index 08d0b691..44165b98 100644 --- a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala @@ -12,7 +12,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "The HTTP Context Propagation" when { "reading from incoming requests" should { "return an empty context if there are no tags nor keys" in { - val context = httpPropagation.read(headerReaderFromMap(Map.empty)) + val context = httpPropagation.readContext(headerReaderFromMap(Map.empty)) context.tags shouldBe empty context.entries shouldBe empty } @@ -22,7 +22,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "x-content-tags" -> "hello=world;correlation=1234", "x-mapped-tag" -> "value" ) - val context = httpPropagation.read(headerReaderFromMap(headers)) + val context = httpPropagation.readContext(headerReaderFromMap(headers)) context.tags should contain only( "hello" -> "world", "correlation" -> "1234", @@ -32,7 +32,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "handle errors when reading HTTP headers" in { val headers = Map("fail" -> "") - val context = httpPropagation.read(headerReaderFromMap(headers)) + val context = httpPropagation.readContext(headerReaderFromMap(headers)) context.tags shouldBe empty context.entries shouldBe empty } @@ -44,7 +44,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "integer-header" -> "123" ) - val context = httpPropagation.read(headerReaderFromMap(headers)) + val context = httpPropagation.readContext(headerReaderFromMap(headers)) context.get(HttpPropagationSpec.StringKey) shouldBe "hey" context.get(HttpPropagationSpec.IntegerKey) shouldBe 123 context.get(HttpPropagationSpec.OptionalKey) shouldBe empty @@ -66,7 +66,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { def propagationWritingTests(direction: Direction.Write) = { "not write anything if the context is empty" in { val headers = mutable.Map.empty[String, String] - httpPropagation.write(Context.Empty, headerWriterFromMap(headers), direction) + httpPropagation.writeContext(Context.Empty, headerWriterFromMap(headers), direction) headers shouldBe empty } @@ -77,7 +77,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { "mappedTag" -> "value" )) - httpPropagation.write(context, headerWriterFromMap(headers), direction) + httpPropagation.writeContext(context, headerWriterFromMap(headers), direction) headers should contain only( "x-content-tags" -> "hello=world;", "x-mapped-tag" -> "value" @@ -91,7 +91,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { HttpPropagationSpec.IntegerKey, 42, ) - httpPropagation.write(context, headerWriterFromMap(headers), direction) + httpPropagation.writeContext(context, headerWriterFromMap(headers), direction) headers should contain only( "string-header" -> "out-we-go" ) @@ -123,7 +123,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader { - override def read(header: String): Option[String] = { + override def readHeader(header: String): Option[String] = { if(map.get("fail").nonEmpty) sys.error("failing on purpose") @@ -132,7 +132,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues { } def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter { - override def write(header: String, value: String): Unit = map.put(header, value) + override def writeHeader(header: String, value: String): Unit = map.put(header, value) } } @@ -146,20 +146,20 @@ object HttpPropagationSpec { class StringEntryCodec extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { private val HeaderName = "string-header" - override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { - reader.read(HeaderName) + override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.readHeader(HeaderName) .map(v => context.withKey(StringKey, v)) .getOrElse(context) } - override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { - Option(context.get(StringKey)).foreach(v => writer.write(HeaderName, v)) + override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { + Option(context.get(StringKey)).foreach(v => writer.writeHeader(HeaderName, v)) } } class IntegerEntryCodec extends HttpPropagation.EntryReader { - override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { - reader.read("integer-header") + override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { + reader.readHeader("integer-header") .map(v => context.withKey(IntegerKey, v.toInt)) .getOrElse(context) diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala new file mode 100644 index 00000000..a3662127 --- /dev/null +++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala @@ -0,0 +1,88 @@ +package kamon.instrumentation + +import kamon.context.Context +import kamon.testkit.SpanInspection +import org.scalatest.{Matchers, OptionValues, WordSpec} + +import scala.collection.mutable + +class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues { + + "the HTTP server instrumentation" when { + "configured for context propagation" should { + "read context entries and tags from the incoming request" in { + val httpServer = HttpServer.from("default", port = 8080, "http.server") + val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.context.tags should contain only( + "tag" -> "value", + "none" -> "0011223344556677" + ) + } + + "use the configured HTTP propagation channel" in { + val httpServer = HttpServer.from("default", port = 8080, "http.server") + val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.context.tags should contain only( + "tag" -> "value", + "none" -> "0011223344556677" + ) + + val span = inspect(handler.span) + span.context().traceID.string shouldNot be("0011223344556677") + span.tag("http.method").value shouldBe "GET" + span.tag("http.url").value shouldBe "http://localhost:8080/" + + val responseHeaders = mutable.Map.empty[String, String] + handler.startResponse(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world")) + + } + } + + "when all capabilities are disabled" should { + "not read any context from the incoming requests" in { + val httpServer = HttpServer.from("noop", port = 8081, "http.server") + val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.context shouldBe Context.Empty + } + + "not create any span to represent the server request" in { + val httpServer = HttpServer.from("noop", port = 8081, "http.server") + val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "custom-trace-id" -> "0011223344556677" + ))) + + handler.span.isEmpty() shouldBe true + } + + "not record any HTTP server metrics" is (pending) + } + } + + def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpRequest = + new HttpRequest { + override def url: String = requestUrl + override def path: String = requestPath + override def method: String = requestMethod + override def readHeader(header: String): Option[String] = headers.get(header) + } + + def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpResponse.Writable[HttpResponse] = + new HttpResponse.Writable[HttpResponse] { + override def statusCode: Int = responseStatusCode + override def writeHeader(header: String, value: String): Unit = headers.put(header, value) + override def build(): HttpResponse = this + } +} diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 279a00a9..8fb2abcc 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -183,12 +183,12 @@ kamon { # Codecs to be used when propagating a Context through a HTTP Headers transport. http-headers-keys { - span = "kamon.trace.SpanCodec$B3" + //span = "kamon.trace.SpanCodec$B3" } # Codecs to be used when propagating a Context through a Binary transport. binary-keys { - span = "kamon.trace.SpanCodec$Colfer" + //span = "kamon.trace.SpanCodec$Colfer" } } } @@ -253,12 +253,29 @@ kamon { http-server { default { + # + # Configuration for HTTP context propagation + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + } + + + # # Configuration for HTTP server metrics collection # metrics { # Enables collection of HTTP server metrics - enable = yes + enabled = yes # Tags to include on the HTTP server metrics. The available options are: # - method: HTTP method from the request. @@ -270,13 +287,15 @@ kamon { ] } + + # # Configuration for HTTP request tracing # tracing { # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests # and finish them when the response is sent back to the clients. - enable = yes + enabled = yes # Select a context tag that provides a custom trace identifier. The custom trace identifier will be used # only if all these conditions are met: @@ -285,58 +304,60 @@ kamon { # - the identifier is valued in accordance to the identity provider. trace-id-tag = "none" - # Metric tags to be automatically included in the HTTP server Spans. The available options are: - # - method: HTTP method from the request. - # - status-code: HTTP status code from the responses. - # - peer: Name of the service issuing the HTTP call. Will use "unknown" if not present. - metric-tags = [ - "method", - "status-code", - "peer" - ] - } - - # Configuration for HTTP context propagation - # - propagation { + # Enables collection of span metrics using the `span.processing-time` metric. + span-metrics = on - # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if - # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can - # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). - enabled = yes + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { - # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default - # configuration for more details on how to configure the detault HTTP context propagation. - channel = "default" - } + # Use the http.url tag. + url = span + # Use the http.method tag. + method = metric - # Custom mappings between routes and operation names. - # - operations { + # Use the http.status_code tag. + status-code = metric - # Operation name for Spans created on requests that could not be handled by any route in the current - # application. - unhanlded = "unhandled" + # Copy tags from the context into the Spans with the specified purpouse. + from-context { - # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode - # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. - # For example, with the following configuration: - # mappings { - # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" - # "/events/*/rsvps" = "EventRSVPs" - # } - # - # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have - # the same operation name "/organization/:orgID/user/:userID/profile". - # - # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation - # name "EventRSVPs". - # - # The patterns are expressed as globs and the operation names are free form. - # - mappings { + # The peer tag identifiest the service that is calling the current service. It is added by default with + # the HttpClient instrumentation. + peer = metric + } + } + # Custom mappings between routes and operation names. + operations { + + # Operation name for Spans created on requests that could not be handled by any route in the current + # application. + unhandled = "unhandled" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } } } } diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index 1eed7e14..4d3501db 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -19,7 +19,7 @@ package context import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ -class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) { +class Context private (private[context] val entries: Map[Context.Key[_], Any], val tags: Map[String, String]) { def get[T](key: Context.Key[T]): T = entries.getOrElse(key, key.emptyValue).asInstanceOf[T] diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala index 5b0bdb38..d225aa28 100644 --- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -25,7 +25,7 @@ trait HttpPropagation { * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an * empty context is returned instead. */ - def read(reader: HttpPropagation.HeaderReader): Context + def readContext(reader: HttpPropagation.HeaderReader): Context /** * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]] @@ -38,7 +38,7 @@ trait HttpPropagation { * @param writer Wrapper on the HTTP message that will carry the context headers. * @param direction Write direction. It can be either Outgoing or Returning. */ - def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit + def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit } @@ -59,7 +59,7 @@ object HttpPropagation { * @param context Current context. * @return Either the original context passed in or a modified version of it, including the read entry. */ - def read(reader: HttpPropagation.HeaderReader, context: Context): Context + def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context } /** @@ -75,7 +75,7 @@ object HttpPropagation { * @param writer Wrapper on the HTTP message that will carry the context headers. * @param direction Write direction. It can be either Outgoing or Returning. */ - def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit + def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit } @@ -90,7 +90,7 @@ object HttpPropagation { * @param header HTTP header name * @return The HTTP header value, if present. */ - def read(header: String): Option[String] + def readHeader(header: String): Option[String] } /** @@ -104,7 +104,7 @@ object HttpPropagation { * @param header HTTP header name. * @param value HTTP header value. */ - def write(header: String, value: String): Unit + def writeHeader(header: String, value: String): Unit } @@ -131,12 +131,12 @@ object HttpPropagation { * of a tag key clash. * - Read all context entries using the incoming entries configuration. */ - override def read(reader: HeaderReader): Context = { + override def readContext(reader: HeaderReader): Context = { val tags = Map.newBuilder[String, String] // Tags encoded together in the context tags header. try { - reader.read(components.tagsHeaderName).foreach { contextTagsHeader => + reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader => contextTagsHeader.split(";").foreach(tagData => { val tagPair = tagData.split("=") if (tagPair.length == 2) { @@ -152,7 +152,7 @@ object HttpPropagation { components.tagsMappings.foreach { case (tagName, httpHeader) => try { - reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) } catch { case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) } @@ -163,7 +163,7 @@ object HttpPropagation { case (context, (entryName, entryDecoder)) => var result = context try { - result = entryDecoder.read(reader, context) + result = entryDecoder.readEntry(reader, context) } catch { case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } @@ -175,7 +175,7 @@ object HttpPropagation { /** * Writes context tags and entries */ - override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { + override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { val keys = direction match { case Direction.Outgoing => components.outgoingEntries case Direction.Returning => components.returningEntries @@ -193,21 +193,21 @@ object HttpPropagation { // Write tags with specific mappings or append them to the context tags header. context.tags.foreach { case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match { - case Some(mappedHeader) => writer.write(mappedHeader, tagValue) + case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue) case None => appendTag(tagKey, tagValue) } } // Write the context tags header. if(contextTagsHeader.nonEmpty) { - writer.write(components.tagsHeaderName, contextTagsHeader.result()) + writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result()) } // Write entries for the specified direction. keys.foreach { case (entryName, entryWriter) => try { - entryWriter.write(context, writer, direction) + entryWriter.writeEntry(context, writer, direction) } catch { case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala new file mode 100644 index 00000000..b0300546 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala @@ -0,0 +1,27 @@ +package kamon.instrumentation + +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} + + +trait HttpRequest extends HeaderReader { + def url: String + def path: String + def method: String +} + +object HttpRequest { + trait Writable[T] extends HttpRequest with HeaderWriter { + def build(): T + } +} + +trait HttpResponse { + def statusCode: Int +} + +object HttpResponse { + trait Writable[T] extends HttpResponse with HeaderWriter { + def build(): T + } +} + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala new file mode 100644 index 00000000..10dccdbc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -0,0 +1,192 @@ +package kamon +package instrumentation + +import com.typesafe.config.Config +import kamon.context.Context +import kamon.context.HttpPropagation.Direction +import kamon.instrumentation.HttpServer.Settings.TagMode +import kamon.trace.Span +import kamon.util.GlobPathFilter + +import scala.collection.JavaConverters._ + +trait HttpServer { + + def handle(request: HttpRequest): HttpServer.Handler + +} + +object HttpServer { + + trait Handler { + + def context: Context + + def span: Span + + def finishRequest(): Unit + + def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse + + def endResponse(): Unit + } + + def from(name: String, port: Int, component: String): HttpServer = { + from(name, port, component, Kamon, Kamon) + } + + def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { + val configKey = "kamon.instrumentation.http-server." + name + new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component) + } + + + class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) + .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}")) + + override def handle(request: HttpRequest): Handler = { + val incomingContext = if(settings.enableContextPropagation) + _propagation.readContext(request) + else Context.Empty + + val requestSpan = if(settings.enableTracing) + buildServerSpan(incomingContext, request) + else Span.Empty + + val handlerContext = if(requestSpan.nonEmpty()) + incomingContext.withKey(Span.ContextKey, requestSpan) + else incomingContext + + // TODO: Handle HTTP Server Metrics + + + new HttpServer.Handler { + override def context: Context = + handlerContext + + override def span: Span = + requestSpan + + override def finishRequest(): Unit = {} + + override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { + if(settings.enableContextPropagation) { + _propagation.writeContext(context, response, Direction.Returning) + } + + response.build() + } + + override def endResponse(): Unit = { + span.finish() + } + } + } + + private def buildServerSpan(context: Context, request: HttpRequest): Span = { + val span = Kamon.buildSpan(operationName(request)) + .withMetricTag("span.kind", "server") + .withMetricTag("component", component) + + def addTag(tag: String, value: String, mode: TagMode): Unit = mode match { + case TagMode.Metric => span.withMetricTag(tag, value) + case TagMode.Span => span.withTag(tag, value) + case TagMode.Off => + } + + addTag("http.url", request.url, settings.urlTagMode) + addTag("http.method", request.method, settings.urlTagMode) + settings.contextTags.foreach { + case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode)) + } + + span.start() + } + + private def operationName(request: HttpRequest): String = { + val requestPath = request.path + val customMapping = settings.operationMappings.find { + case (pattern, _) => pattern.accept(requestPath) + }.map(_._2) + + customMapping.getOrElse("http.request") + } + } + + + case class Settings( + enableContextPropagation: Boolean, + propagationChannel: String, + enableServerMetrics: Boolean, + serverMetricsTags: Seq[String], + enableTracing: Boolean, + traceIDTag: Option[String], + enableSpanMetrics: Boolean, + urlTagMode: TagMode, + methodTagMode: TagMode, + statusCodeTagMode: TagMode, + contextTags: Map[String, TagMode], + unhandledOperationName: String, + operationMappings: Map[GlobPathFilter, String] + ) + + object Settings { + + sealed trait TagMode + object TagMode { + case object Metric extends TagMode + case object Span extends TagMode + case object Off extends TagMode + + def from(value: String): TagMode = value.toLowerCase match { + case "metric" => TagMode.Metric + case "span" => TagMode.Span + case _ => TagMode.Off + } + } + + def from(config: Config): Settings = { + + // Context propagation settings + val enablePropagation = config.getBoolean("propagation.enabled") + val propagationChannel = config.getString("propagation.channel") + + // HTTP Server metrics settings + val enableServerMetrics = config.getBoolean("metrics.enabled") + val serverMetricsTags = config.getStringList("metrics.tags").asScala + + // Tracing settings + val enableTracing = config.getBoolean("tracing.enabled") + val traceIdTag = Option(config.getString("tracing.trace-id-tag")).filterNot(_ == "none") + val enableSpanMetrics = config.getBoolean("tracing.span-metrics") + val urlTagMode = TagMode.from(config.getString("tracing.tags.url")) + val methodTagMode = TagMode.from(config.getString("tracing.tags.method")) + val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code")) + val contextTags = config.getConfig("tracing.tags.from-context").pairs.map { + case (tagName, mode) => (tagName, TagMode.from(mode)) + } + + val unhandledOperationName = config.getString("tracing.operations.unhandled") + val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map { + case (pattern, operationName) => (new GlobPathFilter(pattern), operationName) + } + + Settings( + enablePropagation, + propagationChannel, + enableServerMetrics, + serverMetricsTags, + enableTracing, + traceIdTag, + enableSpanMetrics, + urlTagMode, + methodTagMode, + statusCodeTagMode, + contextTags, + unhandledOperationName, + operationMappings + ) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 7d707c9f..7439520c 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -30,24 +30,24 @@ object SpanCodec { class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { import B3.Headers - override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = reader.read(Headers.TraceIdentifier) + val traceID = reader.readHeader(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = reader.read(Headers.SpanIdentifier) + val spanID = reader.readHeader(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = reader.read(Headers.ParentSpanIdentifier) + val parentID = reader.readHeader(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = reader.read(Headers.Flags) + val flags = reader.readHeader(Headers.Flags) - val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -59,19 +59,19 @@ object SpanCodec { } - override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { + override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { val span = context.get(Span.ContextKey) if(span.nonEmpty()) { val spanContext = span.context() - writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) if(spanContext.parentID != IdentityProvider.NoIdentifier) - writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - writer.write(Headers.Sampled, samplingDecision) + writer.writeHeader(Headers.Sampled, samplingDecision) } } } diff --git a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala index 24e8390a..d36d0df2 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala @@ -23,6 +23,9 @@ trait ContextTesting { def contextWithLocal(value: String): Context = Context.of(StringKey, Some(value)) + + + } object ContextTesting extends ContextTesting diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala index fbfdc7c3..36b57f2d 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala @@ -19,7 +19,7 @@ import java.time.Instant import kamon.Kamon import kamon.trace.{Span, SpanContext} -import kamon.trace.Span.FinishedSpan +import kamon.trace.Span.{FinishedSpan, TagValue} import scala.reflect.ClassTag import scala.util.Try @@ -52,9 +52,20 @@ object SpanInspection { def spanTags(): Map[String, Span.TagValue] = spanData.tags + def tag(key: String): Option[String] = + spanTag(key).map { + case TagValue.String(string) => string + case TagValue.Number(number) => number.toString + case TagValue.True => "true" + case TagValue.False => "false" + } + def metricTags(): Map[String, String] = getField[Span.Local, Map[String, String]](realSpan, "customMetricTags") + def metricTag(key: String): Option[String] = + metricTags().get(key) + def from(): Instant = getField[Span.Local, Instant](realSpan, "from") -- cgit v1.2.3