From c487c51a54e67944c80cf2aecc63ac8158bf99a6 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 3 Sep 2018 15:37:14 +0200 Subject: wip on the HttpServer instrumentation --- kamon-core/src/main/resources/reference.conf | 119 +++++++------ .../src/main/scala/kamon/context/Context.scala | 2 +- .../main/scala/kamon/context/HttpPropagation.scala | 28 +-- .../scala/kamon/instrumentation/HttpMessage.scala | 27 +++ .../scala/kamon/instrumentation/HttpServer.scala | 192 +++++++++++++++++++++ .../src/main/scala/kamon/trace/SpanCodec.scala | 22 +-- 6 files changed, 315 insertions(+), 75 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 279a00a9..8fb2abcc 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -183,12 +183,12 @@ kamon { # Codecs to be used when propagating a Context through a HTTP Headers transport. http-headers-keys { - span = "kamon.trace.SpanCodec$B3" + //span = "kamon.trace.SpanCodec$B3" } # Codecs to be used when propagating a Context through a Binary transport. binary-keys { - span = "kamon.trace.SpanCodec$Colfer" + //span = "kamon.trace.SpanCodec$Colfer" } } } @@ -253,12 +253,29 @@ kamon { http-server { default { + # + # Configuration for HTTP context propagation + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + } + + + # # Configuration for HTTP server metrics collection # metrics { # Enables collection of HTTP server metrics - enable = yes + enabled = yes # Tags to include on the HTTP server metrics. The available options are: # - method: HTTP method from the request. @@ -270,13 +287,15 @@ kamon { ] } + + # # Configuration for HTTP request tracing # tracing { # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests # and finish them when the response is sent back to the clients. - enable = yes + enabled = yes # Select a context tag that provides a custom trace identifier. The custom trace identifier will be used # only if all these conditions are met: @@ -285,58 +304,60 @@ kamon { # - the identifier is valued in accordance to the identity provider. trace-id-tag = "none" - # Metric tags to be automatically included in the HTTP server Spans. The available options are: - # - method: HTTP method from the request. - # - status-code: HTTP status code from the responses. - # - peer: Name of the service issuing the HTTP call. Will use "unknown" if not present. - metric-tags = [ - "method", - "status-code", - "peer" - ] - } - - # Configuration for HTTP context propagation - # - propagation { + # Enables collection of span metrics using the `span.processing-time` metric. + span-metrics = on - # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if - # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can - # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). - enabled = yes + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { - # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default - # configuration for more details on how to configure the detault HTTP context propagation. - channel = "default" - } + # Use the http.url tag. + url = span + # Use the http.method tag. + method = metric - # Custom mappings between routes and operation names. - # - operations { + # Use the http.status_code tag. + status-code = metric - # Operation name for Spans created on requests that could not be handled by any route in the current - # application. - unhanlded = "unhandled" + # Copy tags from the context into the Spans with the specified purpouse. + from-context { - # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode - # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. - # For example, with the following configuration: - # mappings { - # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" - # "/events/*/rsvps" = "EventRSVPs" - # } - # - # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have - # the same operation name "/organization/:orgID/user/:userID/profile". - # - # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation - # name "EventRSVPs". - # - # The patterns are expressed as globs and the operation names are free form. - # - mappings { + # The peer tag identifiest the service that is calling the current service. It is added by default with + # the HttpClient instrumentation. + peer = metric + } + } + # Custom mappings between routes and operation names. + operations { + + # Operation name for Spans created on requests that could not be handled by any route in the current + # application. + unhandled = "unhandled" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } } } } diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index 1eed7e14..4d3501db 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -19,7 +19,7 @@ package context import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ -class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) { +class Context private (private[context] val entries: Map[Context.Key[_], Any], val tags: Map[String, String]) { def get[T](key: Context.Key[T]): T = entries.getOrElse(key, key.emptyValue).asInstanceOf[T] diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala index 5b0bdb38..d225aa28 100644 --- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -25,7 +25,7 @@ trait HttpPropagation { * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an * empty context is returned instead. */ - def read(reader: HttpPropagation.HeaderReader): Context + def readContext(reader: HttpPropagation.HeaderReader): Context /** * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]] @@ -38,7 +38,7 @@ trait HttpPropagation { * @param writer Wrapper on the HTTP message that will carry the context headers. * @param direction Write direction. It can be either Outgoing or Returning. */ - def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit + def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit } @@ -59,7 +59,7 @@ object HttpPropagation { * @param context Current context. * @return Either the original context passed in or a modified version of it, including the read entry. */ - def read(reader: HttpPropagation.HeaderReader, context: Context): Context + def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context } /** @@ -75,7 +75,7 @@ object HttpPropagation { * @param writer Wrapper on the HTTP message that will carry the context headers. * @param direction Write direction. It can be either Outgoing or Returning. */ - def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit + def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit } @@ -90,7 +90,7 @@ object HttpPropagation { * @param header HTTP header name * @return The HTTP header value, if present. */ - def read(header: String): Option[String] + def readHeader(header: String): Option[String] } /** @@ -104,7 +104,7 @@ object HttpPropagation { * @param header HTTP header name. * @param value HTTP header value. */ - def write(header: String, value: String): Unit + def writeHeader(header: String, value: String): Unit } @@ -131,12 +131,12 @@ object HttpPropagation { * of a tag key clash. * - Read all context entries using the incoming entries configuration. */ - override def read(reader: HeaderReader): Context = { + override def readContext(reader: HeaderReader): Context = { val tags = Map.newBuilder[String, String] // Tags encoded together in the context tags header. try { - reader.read(components.tagsHeaderName).foreach { contextTagsHeader => + reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader => contextTagsHeader.split(";").foreach(tagData => { val tagPair = tagData.split("=") if (tagPair.length == 2) { @@ -152,7 +152,7 @@ object HttpPropagation { components.tagsMappings.foreach { case (tagName, httpHeader) => try { - reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) } catch { case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) } @@ -163,7 +163,7 @@ object HttpPropagation { case (context, (entryName, entryDecoder)) => var result = context try { - result = entryDecoder.read(reader, context) + result = entryDecoder.readEntry(reader, context) } catch { case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } @@ -175,7 +175,7 @@ object HttpPropagation { /** * Writes context tags and entries */ - override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { + override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { val keys = direction match { case Direction.Outgoing => components.outgoingEntries case Direction.Returning => components.returningEntries @@ -193,21 +193,21 @@ object HttpPropagation { // Write tags with specific mappings or append them to the context tags header. context.tags.foreach { case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match { - case Some(mappedHeader) => writer.write(mappedHeader, tagValue) + case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue) case None => appendTag(tagKey, tagValue) } } // Write the context tags header. if(contextTagsHeader.nonEmpty) { - writer.write(components.tagsHeaderName, contextTagsHeader.result()) + writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result()) } // Write entries for the specified direction. keys.foreach { case (entryName, entryWriter) => try { - entryWriter.write(context, writer, direction) + entryWriter.writeEntry(context, writer, direction) } catch { case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala new file mode 100644 index 00000000..b0300546 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala @@ -0,0 +1,27 @@ +package kamon.instrumentation + +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} + + +trait HttpRequest extends HeaderReader { + def url: String + def path: String + def method: String +} + +object HttpRequest { + trait Writable[T] extends HttpRequest with HeaderWriter { + def build(): T + } +} + +trait HttpResponse { + def statusCode: Int +} + +object HttpResponse { + trait Writable[T] extends HttpResponse with HeaderWriter { + def build(): T + } +} + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala new file mode 100644 index 00000000..10dccdbc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -0,0 +1,192 @@ +package kamon +package instrumentation + +import com.typesafe.config.Config +import kamon.context.Context +import kamon.context.HttpPropagation.Direction +import kamon.instrumentation.HttpServer.Settings.TagMode +import kamon.trace.Span +import kamon.util.GlobPathFilter + +import scala.collection.JavaConverters._ + +trait HttpServer { + + def handle(request: HttpRequest): HttpServer.Handler + +} + +object HttpServer { + + trait Handler { + + def context: Context + + def span: Span + + def finishRequest(): Unit + + def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse + + def endResponse(): Unit + } + + def from(name: String, port: Int, component: String): HttpServer = { + from(name, port, component, Kamon, Kamon) + } + + def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { + val configKey = "kamon.instrumentation.http-server." + name + new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component) + } + + + class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) + .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}")) + + override def handle(request: HttpRequest): Handler = { + val incomingContext = if(settings.enableContextPropagation) + _propagation.readContext(request) + else Context.Empty + + val requestSpan = if(settings.enableTracing) + buildServerSpan(incomingContext, request) + else Span.Empty + + val handlerContext = if(requestSpan.nonEmpty()) + incomingContext.withKey(Span.ContextKey, requestSpan) + else incomingContext + + // TODO: Handle HTTP Server Metrics + + + new HttpServer.Handler { + override def context: Context = + handlerContext + + override def span: Span = + requestSpan + + override def finishRequest(): Unit = {} + + override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { + if(settings.enableContextPropagation) { + _propagation.writeContext(context, response, Direction.Returning) + } + + response.build() + } + + override def endResponse(): Unit = { + span.finish() + } + } + } + + private def buildServerSpan(context: Context, request: HttpRequest): Span = { + val span = Kamon.buildSpan(operationName(request)) + .withMetricTag("span.kind", "server") + .withMetricTag("component", component) + + def addTag(tag: String, value: String, mode: TagMode): Unit = mode match { + case TagMode.Metric => span.withMetricTag(tag, value) + case TagMode.Span => span.withTag(tag, value) + case TagMode.Off => + } + + addTag("http.url", request.url, settings.urlTagMode) + addTag("http.method", request.method, settings.urlTagMode) + settings.contextTags.foreach { + case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode)) + } + + span.start() + } + + private def operationName(request: HttpRequest): String = { + val requestPath = request.path + val customMapping = settings.operationMappings.find { + case (pattern, _) => pattern.accept(requestPath) + }.map(_._2) + + customMapping.getOrElse("http.request") + } + } + + + case class Settings( + enableContextPropagation: Boolean, + propagationChannel: String, + enableServerMetrics: Boolean, + serverMetricsTags: Seq[String], + enableTracing: Boolean, + traceIDTag: Option[String], + enableSpanMetrics: Boolean, + urlTagMode: TagMode, + methodTagMode: TagMode, + statusCodeTagMode: TagMode, + contextTags: Map[String, TagMode], + unhandledOperationName: String, + operationMappings: Map[GlobPathFilter, String] + ) + + object Settings { + + sealed trait TagMode + object TagMode { + case object Metric extends TagMode + case object Span extends TagMode + case object Off extends TagMode + + def from(value: String): TagMode = value.toLowerCase match { + case "metric" => TagMode.Metric + case "span" => TagMode.Span + case _ => TagMode.Off + } + } + + def from(config: Config): Settings = { + + // Context propagation settings + val enablePropagation = config.getBoolean("propagation.enabled") + val propagationChannel = config.getString("propagation.channel") + + // HTTP Server metrics settings + val enableServerMetrics = config.getBoolean("metrics.enabled") + val serverMetricsTags = config.getStringList("metrics.tags").asScala + + // Tracing settings + val enableTracing = config.getBoolean("tracing.enabled") + val traceIdTag = Option(config.getString("tracing.trace-id-tag")).filterNot(_ == "none") + val enableSpanMetrics = config.getBoolean("tracing.span-metrics") + val urlTagMode = TagMode.from(config.getString("tracing.tags.url")) + val methodTagMode = TagMode.from(config.getString("tracing.tags.method")) + val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code")) + val contextTags = config.getConfig("tracing.tags.from-context").pairs.map { + case (tagName, mode) => (tagName, TagMode.from(mode)) + } + + val unhandledOperationName = config.getString("tracing.operations.unhandled") + val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map { + case (pattern, operationName) => (new GlobPathFilter(pattern), operationName) + } + + Settings( + enablePropagation, + propagationChannel, + enableServerMetrics, + serverMetricsTags, + enableTracing, + traceIdTag, + enableSpanMetrics, + urlTagMode, + methodTagMode, + statusCodeTagMode, + contextTags, + unhandledOperationName, + operationMappings + ) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 7d707c9f..7439520c 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -30,24 +30,24 @@ object SpanCodec { class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { import B3.Headers - override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = reader.read(Headers.TraceIdentifier) + val traceID = reader.readHeader(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = reader.read(Headers.SpanIdentifier) + val spanID = reader.readHeader(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = reader.read(Headers.ParentSpanIdentifier) + val parentID = reader.readHeader(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = reader.read(Headers.Flags) + val flags = reader.readHeader(Headers.Flags) - val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -59,19 +59,19 @@ object SpanCodec { } - override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { + override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { val span = context.get(Span.ContextKey) if(span.nonEmpty()) { val spanContext = span.context() - writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) if(spanContext.parentID != IdentityProvider.NoIdentifier) - writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - writer.write(Headers.Sampled, samplingDecision) + writer.writeHeader(Headers.Sampled, samplingDecision) } } } -- cgit v1.2.3