diff options
6 files changed, 232 insertions, 139 deletions
diff --git a/kamon-core-tests/src/test/resources/reference.conf b/kamon-core-tests/src/test/resources/reference.conf index d165249e..6615bb7b 100644 --- a/kamon-core-tests/src/test/resources/reference.conf +++ b/kamon-core-tests/src/test/resources/reference.conf @@ -7,117 +7,31 @@ kamon { kamon { - instrumentation { - http-server { - noop { - - # - # Configuration for HTTP context propagation - # - propagation { - - # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if - # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can - # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). - enabled = no - - # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default - # configuration for more details on how to configure the detault HTTP context propagation. - channel = "default" - } - - - # - # Configuration for HTTP server metrics collection - # - metrics { - - # Enables collection of HTTP server metrics - enabled = no - - # Tags to include on the HTTP server metrics. The available options are: - # - method: HTTP method from the request. - # - status-code: HTTP status code from the responses. - # - tags = [ - "method", - "status-code" - ] - } - - - # - # Configuration for HTTP request tracing - # - tracing { - - # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests - # and finish them when the response is sent back to the clients. - enabled = no - - # Select a context tag that provides a custom trace identifier. The custom trace identifier will be used - # only if all these conditions are met: - # - the context tag is present. - # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). - # - the identifier is valued in accordance to the identity provider. - trace-id-tag = "none" - # Enables collection of span metrics using the `span.processing-time` metric. - span-metrics = on - - # Select which tags should be included as span and span metric tags. The possible options are: - # - span: the tag is added as a Span tag (i.e. using span.tag(...)) - # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) - # - off: the tag is not used. - # - tags { - - # Use the http.url tag. - url = span - - # Use the http.method tag. - method = metric - - # Use the http.status_code tag. - status-code = metric - - # Copy tags from the context into the Spans with the specified purpouse. - from-context { - - # The peer tag identifiest the service that is calling the current service. It is added by default with - # the HttpClient instrumentation. - peer = metric - } - } + trace { + sampler = always + } - # Custom mappings between routes and operation names. - operations { + propagation.http.default { + tags.mappings { + "correlation-id" = "x-correlation-id" + } + } - # Operation name for Spans created on requests that could not be handled by any route in the current - # application. - unhandled = "unhandled" + instrumentation { + http-server { + default { + tracing.trace-id-tag = "correlation-id" + } - # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode - # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. - # For example, with the following configuration: - # mappings { - # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" - # "/events/*/rsvps" = "EventRSVPs" - # } - # - # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have - # the same operation name "/organization/:orgID/user/:userID/profile". - # - # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation - # name "EventRSVPs". - # - # The patterns are expressed as globs and the operation names are free form. - # - mappings { + no-span-metrics { + tracing.span-metrics = off + } - } - } - } + noop { + propagation.enabled = no + metrics.enabled = no + tracing.enabled = no } } } diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala index a3662127..e31382a5 100644 --- a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala @@ -11,8 +11,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp "the HTTP server instrumentation" when { "configured for context propagation" should { "read context entries and tags from the incoming request" in { - val httpServer = HttpServer.from("default", port = 8080, "http.server") - val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) @@ -24,8 +23,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp } "use the configured HTTP propagation channel" in { - val httpServer = HttpServer.from("default", port = 8080, "http.server") - val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) @@ -41,15 +39,63 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp span.tag("http.url").value shouldBe "http://localhost:8080/" val responseHeaders = mutable.Map.empty[String, String] - handler.startResponse(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world")) + handler.send(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world")) } } - "when all capabilities are disabled" should { + "configured for distributed tracing" should { + "create a span representing the current HTTP operation" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + handler.send(fakeResponse(200, mutable.Map.empty), handler.context) + + val span = inspect(handler.span) + span.tag("http.method").value shouldBe "GET" + span.tag("http.url").value shouldBe "http://localhost:8080/" + span.tag("http.status_code").value shouldBe "200" + } + + "adopt a traceID when explicitly provided" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;", + "x-correlation-id" -> "0011223344556677" + ))) + + handler.span.context().traceID.string shouldBe "0011223344556677" + } + + "record span metrics when enabled" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + handler.send(fakeResponse(200, mutable.Map.empty), handler.context) + + val span = inspect(handler.span) + span.hasMetricsEnabled() shouldBe true + } + + "not record span metrics when disabled" in { + val handler = HttpServer.from("no-span-metrics", port = 8081, "http.server") + .receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) + handler.send(fakeResponse(200, mutable.Map.empty), handler.context) + + val span = inspect(handler.span) + span.hasMetricsEnabled() shouldBe false + } + + "receive tags from context when available" in { + val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( + "context-tags" -> "tag=value;none=0011223344556677;peer=superservice;", + "custom-trace-id" -> "0011223344556677" + ))) + + val span = inspect(handler.span) + span.tag("peer").value shouldBe "superservice" + } + } + + "all capabilities are disabled" should { "not read any context from the incoming requests" in { val httpServer = HttpServer.from("noop", port = 8081, "http.server") - val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) @@ -59,7 +105,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp "not create any span to represent the server request" in { val httpServer = HttpServer.from("noop", port = 8081, "http.server") - val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map( + val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) @@ -71,6 +117,9 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp } } + def httpServer(): HttpServer = HttpServer.from("default", port = 8081, "http.server") + def noopHttpServer(): HttpServer = HttpServer.from("noop", port = 8081, "http.server") + def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpRequest = new HttpRequest { override def url: String = requestUrl diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 8fb2abcc..5984f3c4 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -301,7 +301,7 @@ kamon { # only if all these conditions are met: # - the context tag is present. # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). - # - the identifier is valued in accordance to the identity provider. + # - the identifier is valid in accordance to the identity provider. trace-id-tag = "none" # Enables collection of span metrics using the `span.processing-time` metric. diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala index 10dccdbc..88803bad 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -1,51 +1,142 @@ package kamon package instrumentation +import java.time.Duration + import com.typesafe.config.Config import kamon.context.Context import kamon.context.HttpPropagation.Direction import kamon.instrumentation.HttpServer.Settings.TagMode -import kamon.trace.Span +import kamon.trace.{IdentityProvider, Span} import kamon.util.GlobPathFilter +import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ +/** + * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server + * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All + * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default". + * + * The default implementation shipping with Kamon provides: + * + * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context + * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server. + * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric + * tags from the incoming requests as well as form the incoming context tags. + * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code + * counts in the HTTP server. + * + */ trait HttpServer { - def handle(request: HttpRequest): HttpServer.Handler + /** + * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that + * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from + * HTTP headers (if context propagation is enabled). + * + * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked + * for all incoming requests. + * + * @param request A HttpRequest wrapper on the original incoming HTTP request. + * @return The RequestHandler that will follow the lifecycle of the incoming request. + */ + def receive(request: HttpRequest): HttpServer.RequestHandler + + + /** + * Signals that a new HTTP connection has been opened. + */ + def openConnection(): Unit + + + /** + * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests + * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will + * be updated when the values are zero. + * + * @param lifetime For how long did the connection remain open. + * @param handledRequests How many requests where handled by the closed connection. + */ + def closeConnection(lifetime: Duration, handledRequests: Long): Unit } object HttpServer { - trait Handler { - + /** + * Handler associated to the processing of a single request. The instrumentation code using this class is responsible + * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the + * doneReceiving, send and doneSending callbacks when appropriate. + */ + trait RequestHandler { + + /** + * If context propagation is enabled this function returns the incoming context associated wih this request, + * otherwise [[Context.Empty]] is returned. + */ def context: Context + /** + * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span. + */ def span: Span - def finishRequest(): Unit + /** + * Signals that the entire request (headers and body) has been received. + * + * @param receivedBytes Size of the entire HTTP request. + */ + def doneReceiving(receivedBytes: Long): Unit + + /** + * Process a response to be sent back to the client. Since returning keys might need to included in the response + * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one + * passed into this function. + * + * @param response Wraps the HTTP response to be sent back to the client. + * @param context Context that should be used for writing returning keys into the response. + * @return The modified HTTP response that should be sent to clients. + */ + def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse + + /** + * Signals that the entire response (headers and body) has been sent to the client. + * + * @param sentBytes Size of the entire HTTP response. + */ + def doneSending(sentBytes: Long): Unit - def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse - - def endResponse(): Unit } + def from(name: String, port: Int, component: String): HttpServer = { from(name, port, component, Kamon, Kamon) } def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { - val configKey = "kamon.instrumentation.http-server." + name - new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component) + val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration) + val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else { + configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration) + } + + new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, port, component) } + val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server" + val DefaultHttpServer = "default" + val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default" - class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + + private class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + private val _log = LoggerFactory.getLogger(classOf[Default]) private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) - .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}")) + .getOrElse { + _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation") + contextPropagation.defaultHttpPropagation() + } - override def handle(request: HttpRequest): Handler = { + override def receive(request: HttpRequest): RequestHandler = { val incomingContext = if(settings.enableContextPropagation) _propagation.readContext(request) else Context.Empty @@ -61,44 +152,65 @@ object HttpServer { // TODO: Handle HTTP Server Metrics - new HttpServer.Handler { + new HttpServer.RequestHandler { override def context: Context = handlerContext override def span: Span = requestSpan - override def finishRequest(): Unit = {} + override def doneReceiving(receivedBytes: Long): Unit = {} + + override def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { + def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match { + case TagMode.Metric => span.tagMetric(tag, value) + case TagMode.Span => span.tag(tag, value) + case TagMode.Off => + } - override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { if(settings.enableContextPropagation) { _propagation.writeContext(context, response, Direction.Returning) } + addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode) response.build() } - override def endResponse(): Unit = { + override def doneSending(sentBytes: Long): Unit = { span.finish() } } } + override def openConnection(): Unit = ??? + + override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = ??? + private def buildServerSpan(context: Context, request: HttpRequest): Span = { val span = Kamon.buildSpan(operationName(request)) .withMetricTag("span.kind", "server") .withMetricTag("component", component) - def addTag(tag: String, value: String, mode: TagMode): Unit = mode match { + if(!settings.enableSpanMetrics) + span.disableMetrics() + + + for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } { + val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID) + if(identifier != IdentityProvider.NoIdentifier) + span.withTraceID(identifier) + } + + def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match { case TagMode.Metric => span.withMetricTag(tag, value) case TagMode.Span => span.withTag(tag, value) case TagMode.Off => } - addTag("http.url", request.url, settings.urlTagMode) - addTag("http.method", request.method, settings.urlTagMode) + addRequestTag("http.url", request.url, settings.urlTagMode) + addRequestTag("http.method", request.method, settings.urlTagMode) settings.contextTags.foreach { - case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode)) + case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode)) } span.start() diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 3e857f00..ad7ffbed 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -99,6 +99,7 @@ object Tracer { private var initialMetricTags = Map.empty[String, String] private var useParentFromContext = true private var trackMetrics = true + private var providedTraceID = IdentityProvider.NoIdentifier def asChildOf(parent: Span): SpanBuilder = { if(parent != Span.Empty) this.parentSpan = parent @@ -158,6 +159,11 @@ object Tracer { this } + def withTraceID(identifier: IdentityProvider.Identifier): SpanBuilder = { + this.providedTraceID = identifier + this + } + def start(): Span = { val spanFrom = if(from == Instant.EPOCH) clock.instant() else from @@ -199,13 +205,21 @@ object Tracer { else parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) - private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = + private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = { + val traceID = + if(providedTraceID != IdentityProvider.NoIdentifier) + providedTraceID + else + tracer._identityProvider.traceIdGenerator().generate() + + SpanContext( - traceID = tracer._identityProvider.traceIdGenerator().generate(), + traceID, spanID = tracer._identityProvider.spanIdGenerator().generate(), parentID = IdentityProvider.NoIdentifier, samplingDecision = samplingDecision ) + } } private final class TracerMetrics(metricLookup: MetricLookup) { diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala index 36b57f2d..e45124b0 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala @@ -52,13 +52,14 @@ object SpanInspection { def spanTags(): Map[String, Span.TagValue] = spanData.tags - def tag(key: String): Option[String] = + def tag(key: String): Option[String] = { spanTag(key).map { case TagValue.String(string) => string case TagValue.Number(number) => number.toString case TagValue.True => "true" case TagValue.False => "false" } + } def metricTags(): Map[String, String] = getField[Span.Local, Map[String, String]](realSpan, "customMetricTags") @@ -75,6 +76,9 @@ object SpanInspection { def operationName(): String = spanData.operationName + def hasMetricsEnabled(): Boolean = + getField[Span.Local, Boolean](realSpan, "collectMetrics") + private def getField[T, R](target: Any, fieldName: String)(implicit classTag: ClassTag[T]): R = { val toFinishedSpanMethod = classTag.runtimeClass.getDeclaredFields.find(_.getName.contains(fieldName)).get |