From c487c51a54e67944c80cf2aecc63ac8158bf99a6 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 3 Sep 2018 15:37:14 +0200 Subject: wip on the HttpServer instrumentation --- .../src/main/scala/kamon/context/Context.scala | 2 +- .../main/scala/kamon/context/HttpPropagation.scala | 28 +-- .../scala/kamon/instrumentation/HttpMessage.scala | 27 +++ .../scala/kamon/instrumentation/HttpServer.scala | 192 +++++++++++++++++++++ .../src/main/scala/kamon/trace/SpanCodec.scala | 22 +-- 5 files changed, 245 insertions(+), 26 deletions(-) create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala create mode 100644 kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala (limited to 'kamon-core/src/main/scala') diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index 1eed7e14..4d3501db 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -19,7 +19,7 @@ package context import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ -class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) { +class Context private (private[context] val entries: Map[Context.Key[_], Any], val tags: Map[String, String]) { def get[T](key: Context.Key[T]): T = entries.getOrElse(key, key.emptyValue).asInstanceOf[T] diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala index 5b0bdb38..d225aa28 100644 --- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -25,7 +25,7 @@ trait HttpPropagation { * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an * empty context is returned instead. */ - def read(reader: HttpPropagation.HeaderReader): Context + def readContext(reader: HttpPropagation.HeaderReader): Context /** * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]] @@ -38,7 +38,7 @@ trait HttpPropagation { * @param writer Wrapper on the HTTP message that will carry the context headers. * @param direction Write direction. It can be either Outgoing or Returning. */ - def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit + def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit } @@ -59,7 +59,7 @@ object HttpPropagation { * @param context Current context. * @return Either the original context passed in or a modified version of it, including the read entry. */ - def read(reader: HttpPropagation.HeaderReader, context: Context): Context + def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context } /** @@ -75,7 +75,7 @@ object HttpPropagation { * @param writer Wrapper on the HTTP message that will carry the context headers. * @param direction Write direction. It can be either Outgoing or Returning. */ - def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit + def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit } @@ -90,7 +90,7 @@ object HttpPropagation { * @param header HTTP header name * @return The HTTP header value, if present. */ - def read(header: String): Option[String] + def readHeader(header: String): Option[String] } /** @@ -104,7 +104,7 @@ object HttpPropagation { * @param header HTTP header name. * @param value HTTP header value. */ - def write(header: String, value: String): Unit + def writeHeader(header: String, value: String): Unit } @@ -131,12 +131,12 @@ object HttpPropagation { * of a tag key clash. * - Read all context entries using the incoming entries configuration. */ - override def read(reader: HeaderReader): Context = { + override def readContext(reader: HeaderReader): Context = { val tags = Map.newBuilder[String, String] // Tags encoded together in the context tags header. try { - reader.read(components.tagsHeaderName).foreach { contextTagsHeader => + reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader => contextTagsHeader.split(";").foreach(tagData => { val tagPair = tagData.split("=") if (tagPair.length == 2) { @@ -152,7 +152,7 @@ object HttpPropagation { components.tagsMappings.foreach { case (tagName, httpHeader) => try { - reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) } catch { case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) } @@ -163,7 +163,7 @@ object HttpPropagation { case (context, (entryName, entryDecoder)) => var result = context try { - result = entryDecoder.read(reader, context) + result = entryDecoder.readEntry(reader, context) } catch { case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } @@ -175,7 +175,7 @@ object HttpPropagation { /** * Writes context tags and entries */ - override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { + override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { val keys = direction match { case Direction.Outgoing => components.outgoingEntries case Direction.Returning => components.returningEntries @@ -193,21 +193,21 @@ object HttpPropagation { // Write tags with specific mappings or append them to the context tags header. context.tags.foreach { case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match { - case Some(mappedHeader) => writer.write(mappedHeader, tagValue) + case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue) case None => appendTag(tagKey, tagValue) } } // Write the context tags header. if(contextTagsHeader.nonEmpty) { - writer.write(components.tagsHeaderName, contextTagsHeader.result()) + writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result()) } // Write entries for the specified direction. keys.foreach { case (entryName, entryWriter) => try { - entryWriter.write(context, writer, direction) + entryWriter.writeEntry(context, writer, direction) } catch { case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala new file mode 100644 index 00000000..b0300546 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala @@ -0,0 +1,27 @@ +package kamon.instrumentation + +import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter} + + +trait HttpRequest extends HeaderReader { + def url: String + def path: String + def method: String +} + +object HttpRequest { + trait Writable[T] extends HttpRequest with HeaderWriter { + def build(): T + } +} + +trait HttpResponse { + def statusCode: Int +} + +object HttpResponse { + trait Writable[T] extends HttpResponse with HeaderWriter { + def build(): T + } +} + diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala new file mode 100644 index 00000000..10dccdbc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -0,0 +1,192 @@ +package kamon +package instrumentation + +import com.typesafe.config.Config +import kamon.context.Context +import kamon.context.HttpPropagation.Direction +import kamon.instrumentation.HttpServer.Settings.TagMode +import kamon.trace.Span +import kamon.util.GlobPathFilter + +import scala.collection.JavaConverters._ + +trait HttpServer { + + def handle(request: HttpRequest): HttpServer.Handler + +} + +object HttpServer { + + trait Handler { + + def context: Context + + def span: Span + + def finishRequest(): Unit + + def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse + + def endResponse(): Unit + } + + def from(name: String, port: Int, component: String): HttpServer = { + from(name, port, component, Kamon, Kamon) + } + + def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { + val configKey = "kamon.instrumentation.http-server." + name + new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component) + } + + + class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) + .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}")) + + override def handle(request: HttpRequest): Handler = { + val incomingContext = if(settings.enableContextPropagation) + _propagation.readContext(request) + else Context.Empty + + val requestSpan = if(settings.enableTracing) + buildServerSpan(incomingContext, request) + else Span.Empty + + val handlerContext = if(requestSpan.nonEmpty()) + incomingContext.withKey(Span.ContextKey, requestSpan) + else incomingContext + + // TODO: Handle HTTP Server Metrics + + + new HttpServer.Handler { + override def context: Context = + handlerContext + + override def span: Span = + requestSpan + + override def finishRequest(): Unit = {} + + override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { + if(settings.enableContextPropagation) { + _propagation.writeContext(context, response, Direction.Returning) + } + + response.build() + } + + override def endResponse(): Unit = { + span.finish() + } + } + } + + private def buildServerSpan(context: Context, request: HttpRequest): Span = { + val span = Kamon.buildSpan(operationName(request)) + .withMetricTag("span.kind", "server") + .withMetricTag("component", component) + + def addTag(tag: String, value: String, mode: TagMode): Unit = mode match { + case TagMode.Metric => span.withMetricTag(tag, value) + case TagMode.Span => span.withTag(tag, value) + case TagMode.Off => + } + + addTag("http.url", request.url, settings.urlTagMode) + addTag("http.method", request.method, settings.urlTagMode) + settings.contextTags.foreach { + case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode)) + } + + span.start() + } + + private def operationName(request: HttpRequest): String = { + val requestPath = request.path + val customMapping = settings.operationMappings.find { + case (pattern, _) => pattern.accept(requestPath) + }.map(_._2) + + customMapping.getOrElse("http.request") + } + } + + + case class Settings( + enableContextPropagation: Boolean, + propagationChannel: String, + enableServerMetrics: Boolean, + serverMetricsTags: Seq[String], + enableTracing: Boolean, + traceIDTag: Option[String], + enableSpanMetrics: Boolean, + urlTagMode: TagMode, + methodTagMode: TagMode, + statusCodeTagMode: TagMode, + contextTags: Map[String, TagMode], + unhandledOperationName: String, + operationMappings: Map[GlobPathFilter, String] + ) + + object Settings { + + sealed trait TagMode + object TagMode { + case object Metric extends TagMode + case object Span extends TagMode + case object Off extends TagMode + + def from(value: String): TagMode = value.toLowerCase match { + case "metric" => TagMode.Metric + case "span" => TagMode.Span + case _ => TagMode.Off + } + } + + def from(config: Config): Settings = { + + // Context propagation settings + val enablePropagation = config.getBoolean("propagation.enabled") + val propagationChannel = config.getString("propagation.channel") + + // HTTP Server metrics settings + val enableServerMetrics = config.getBoolean("metrics.enabled") + val serverMetricsTags = config.getStringList("metrics.tags").asScala + + // Tracing settings + val enableTracing = config.getBoolean("tracing.enabled") + val traceIdTag = Option(config.getString("tracing.trace-id-tag")).filterNot(_ == "none") + val enableSpanMetrics = config.getBoolean("tracing.span-metrics") + val urlTagMode = TagMode.from(config.getString("tracing.tags.url")) + val methodTagMode = TagMode.from(config.getString("tracing.tags.method")) + val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code")) + val contextTags = config.getConfig("tracing.tags.from-context").pairs.map { + case (tagName, mode) => (tagName, TagMode.from(mode)) + } + + val unhandledOperationName = config.getString("tracing.operations.unhandled") + val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map { + case (pattern, operationName) => (new GlobPathFilter(pattern), operationName) + } + + Settings( + enablePropagation, + propagationChannel, + enableServerMetrics, + serverMetricsTags, + enableTracing, + traceIdTag, + enableSpanMetrics, + urlTagMode, + methodTagMode, + statusCodeTagMode, + contextTags, + unhandledOperationName, + operationMappings + ) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 7d707c9f..7439520c 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -30,24 +30,24 @@ object SpanCodec { class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { import B3.Headers - override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { + override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = reader.read(Headers.TraceIdentifier) + val traceID = reader.readHeader(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = reader.read(Headers.SpanIdentifier) + val spanID = reader.readHeader(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = reader.read(Headers.ParentSpanIdentifier) + val parentID = reader.readHeader(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = reader.read(Headers.Flags) + val flags = reader.readHeader(Headers.Flags) - val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -59,19 +59,19 @@ object SpanCodec { } - override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { + override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { val span = context.get(Span.ContextKey) if(span.nonEmpty()) { val spanContext = span.context() - writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) if(spanContext.parentID != IdentityProvider.NoIdentifier) - writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - writer.write(Headers.Sampled, samplingDecision) + writer.writeHeader(Headers.Sampled, samplingDecision) } } } -- cgit v1.2.3