From ac43c0476c239a9cf1c20e838b0fd212b20161e1 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Tue, 4 Sep 2018 18:32:36 +0200 Subject: basic testing for the HTTP server instrumentation --- .../scala/kamon/instrumentation/HttpServer.scala | 154 ++++++++++++++++++--- 1 file changed, 133 insertions(+), 21 deletions(-) (limited to 'kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala') diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala index 10dccdbc..88803bad 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -1,51 +1,142 @@ package kamon package instrumentation +import java.time.Duration + import com.typesafe.config.Config import kamon.context.Context import kamon.context.HttpPropagation.Direction import kamon.instrumentation.HttpServer.Settings.TagMode -import kamon.trace.Span +import kamon.trace.{IdentityProvider, Span} import kamon.util.GlobPathFilter +import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ +/** + * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server + * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All + * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default". + * + * The default implementation shipping with Kamon provides: + * + * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context + * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server. + * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric + * tags from the incoming requests as well as form the incoming context tags. + * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code + * counts in the HTTP server. + * + */ trait HttpServer { - def handle(request: HttpRequest): HttpServer.Handler + /** + * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that + * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from + * HTTP headers (if context propagation is enabled). + * + * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked + * for all incoming requests. + * + * @param request A HttpRequest wrapper on the original incoming HTTP request. + * @return The RequestHandler that will follow the lifecycle of the incoming request. + */ + def receive(request: HttpRequest): HttpServer.RequestHandler + + + /** + * Signals that a new HTTP connection has been opened. + */ + def openConnection(): Unit + + + /** + * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests + * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will + * be updated when the values are zero. + * + * @param lifetime For how long did the connection remain open. + * @param handledRequests How many requests where handled by the closed connection. + */ + def closeConnection(lifetime: Duration, handledRequests: Long): Unit } object HttpServer { - trait Handler { - + /** + * Handler associated to the processing of a single request. The instrumentation code using this class is responsible + * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the + * doneReceiving, send and doneSending callbacks when appropriate. + */ + trait RequestHandler { + + /** + * If context propagation is enabled this function returns the incoming context associated wih this request, + * otherwise [[Context.Empty]] is returned. + */ def context: Context + /** + * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span. + */ def span: Span - def finishRequest(): Unit + /** + * Signals that the entire request (headers and body) has been received. + * + * @param receivedBytes Size of the entire HTTP request. + */ + def doneReceiving(receivedBytes: Long): Unit + + /** + * Process a response to be sent back to the client. Since returning keys might need to included in the response + * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one + * passed into this function. + * + * @param response Wraps the HTTP response to be sent back to the client. + * @param context Context that should be used for writing returning keys into the response. + * @return The modified HTTP response that should be sent to clients. + */ + def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse + + /** + * Signals that the entire response (headers and body) has been sent to the client. + * + * @param sentBytes Size of the entire HTTP response. + */ + def doneSending(sentBytes: Long): Unit - def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse - - def endResponse(): Unit } + def from(name: String, port: Int, component: String): HttpServer = { from(name, port, component, Kamon, Kamon) } def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { - val configKey = "kamon.instrumentation.http-server." + name - new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component) + val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration) + val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else { + configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration) + } + + new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, port, component) } + val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server" + val DefaultHttpServer = "default" + val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default" - class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + + private class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + private val _log = LoggerFactory.getLogger(classOf[Default]) private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) - .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}")) + .getOrElse { + _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation") + contextPropagation.defaultHttpPropagation() + } - override def handle(request: HttpRequest): Handler = { + override def receive(request: HttpRequest): RequestHandler = { val incomingContext = if(settings.enableContextPropagation) _propagation.readContext(request) else Context.Empty @@ -61,44 +152,65 @@ object HttpServer { // TODO: Handle HTTP Server Metrics - new HttpServer.Handler { + new HttpServer.RequestHandler { override def context: Context = handlerContext override def span: Span = requestSpan - override def finishRequest(): Unit = {} + override def doneReceiving(receivedBytes: Long): Unit = {} + + override def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { + def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match { + case TagMode.Metric => span.tagMetric(tag, value) + case TagMode.Span => span.tag(tag, value) + case TagMode.Off => + } - override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { if(settings.enableContextPropagation) { _propagation.writeContext(context, response, Direction.Returning) } + addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode) response.build() } - override def endResponse(): Unit = { + override def doneSending(sentBytes: Long): Unit = { span.finish() } } } + override def openConnection(): Unit = ??? + + override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = ??? + private def buildServerSpan(context: Context, request: HttpRequest): Span = { val span = Kamon.buildSpan(operationName(request)) .withMetricTag("span.kind", "server") .withMetricTag("component", component) - def addTag(tag: String, value: String, mode: TagMode): Unit = mode match { + if(!settings.enableSpanMetrics) + span.disableMetrics() + + + for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } { + val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID) + if(identifier != IdentityProvider.NoIdentifier) + span.withTraceID(identifier) + } + + def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match { case TagMode.Metric => span.withMetricTag(tag, value) case TagMode.Span => span.withTag(tag, value) case TagMode.Off => } - addTag("http.url", request.url, settings.urlTagMode) - addTag("http.method", request.method, settings.urlTagMode) + addRequestTag("http.url", request.url, settings.urlTagMode) + addRequestTag("http.method", request.method, settings.urlTagMode) settings.contextTags.foreach { - case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode)) + case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode)) } span.start() -- cgit v1.2.3