package kamon package instrumentation import java.time.Duration import com.typesafe.config.Config import kamon.context.Context import kamon.instrumentation.HttpServer.Settings.TagMode import kamon.metric.MeasurementUnit.{time, information} import kamon.trace.{IdentityProvider, Span} import kamon.util.GlobPathFilter import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ /** * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default". * * The default implementation shipping with Kamon provides: * * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server. * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric * tags from the incoming requests as well as form the incoming context tags. * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code * counts in the HTTP server. * */ trait HttpServer { /** * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from * HTTP headers (if context propagation is enabled). * * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked * for all incoming requests. * * @param request A HttpRequest wrapper on the original incoming HTTP request. * @return The RequestHandler that will follow the lifecycle of the incoming request. */ def receive(request: HttpMessage.Request): HttpServer.RequestHandler /** * Signals that a new HTTP connection has been opened. */ def openConnection(): Unit /** * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will * be updated when the values are zero. * * @param lifetime For how long did the connection remain open. * @param handledRequests How many requests where handled by the closed connection. */ def closeConnection(lifetime: Duration, handledRequests: Long): Unit /** * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances * after calling this function is undefined. */ def shutdown(): Unit } object HttpServer { /** * Handler associated to the processing of a single request. The instrumentation code using this class is responsible * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the * doneReceiving, send and doneSending callbacks when appropriate. */ trait RequestHandler { /** * If context propagation is enabled this function returns the incoming context associated wih this request, * otherwise [[Context.Empty]] is returned. */ def context: Context /** * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span. */ def span: Span /** * Signals that the entire request (headers and body) has been received. * * @param receivedBytes Size of the entire HTTP request. */ def doneReceiving(receivedBytes: Long): Unit /** * Process a response to be sent back to the client. Since returning keys might need to included in the response * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one * passed into this function. * * @param response Wraps the HTTP response to be sent back to the client. * @param context Context that should be used for writing returning keys into the response. * @return The modified HTTP response that should be sent to clients. */ def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse /** * Signals that the entire response (headers and body) has been sent to the client. * * @param sentBytes Size of the entire HTTP response. */ def doneSending(sentBytes: Long): Unit } /** * Holds all metric instruments required to record metrics from an HTTP server. * * @param interface Interface name or address where the HTTP server is listening. * @param port Port number where the HTTP server is listening. */ class Metrics(component: String, interface: String, port: Int) { import Metrics._ private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics]) private val _statusCodeTag = "status_code" private val _serverTags = Map( "component" -> component, "interface" -> interface, "port" -> port.toString ) val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx")) val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx")) val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx")) val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx")) val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx")) val activeRequests = ActiveRequests.refine(_serverTags) val requestSize = RequestSize.refine(_serverTags) val responseSize = ResponseSize.refine(_serverTags) val connectionLifetime = ConnectionLifetime.refine(_serverTags) val connectionUsage = ConnectionUsage.refine(_serverTags) val openConnections = OpenConnections.refine(_serverTags) def countCompletedRequest(statusCode: Int): Unit = { if(statusCode >= 200 && statusCode <= 299) requestsSuccessful.increment() else if(statusCode >= 500 && statusCode <=599) requestsServerError.increment() else if(statusCode >= 400 && statusCode <=499) requestsClientError.increment() else if(statusCode >= 300 && statusCode <=399) requestsRedirection.increment() else if(statusCode >= 100 && statusCode <=199) requestsInformational.increment() else { _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString) } } /** * Removes all registered metrics from Kamon. */ def cleanup(): Unit = { CompletedRequests.remove(statusCodeTag("1xx")) CompletedRequests.remove(statusCodeTag("2xx")) CompletedRequests.remove(statusCodeTag("3xx")) CompletedRequests.remove(statusCodeTag("4xx")) CompletedRequests.remove(statusCodeTag("5xx")) ActiveRequests.remove(_serverTags) RequestSize.remove(_serverTags) ResponseSize.remove(_serverTags) ConnectionLifetime.remove(_serverTags) ConnectionUsage.remove(_serverTags) OpenConnections.remove(_serverTags) } private def statusCodeTag(group: String): Map[String, String] = _serverTags + (_statusCodeTag -> group) } object Metrics { def of(component: String, interface: String, port: Int): Metrics = new HttpServer.Metrics(component, interface, port) /** * Number of completed requests per status code. */ val CompletedRequests = Kamon.counter("http.server.requests") /** * Number of requests being processed simultaneously at any point in time. */ val ActiveRequests = Kamon.rangeSampler("http.server.request.active") /** * Request size distribution (including headers and body) for all requests received by the server. */ val RequestSize = Kamon.histogram("http.server.request.size", information.bytes) /** * Response size distribution (including headers and body) for all responses served by the server. */ val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes) /** * Tracks the time elapsed between connection creation and connection close. */ val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds) /** * Distribution of number of requests handled per connection during their entire lifetime. */ val ConnectionUsage = Kamon.histogram("http.server.connection.usage") /** * Number of open connections. */ val OpenConnections = Kamon.rangeSampler("http.server.connection.open") } def from(name: String, component: String, interface: String, port: Int): HttpServer = { from(name, component, interface, port, Kamon, Kamon) } def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration) val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else { configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration) } new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port) } val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server" val DefaultHttpServer = "default" val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default" private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer { private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None private val _log = LoggerFactory.getLogger(classOf[Default]) private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) .getOrElse { _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation") contextPropagation.defaultHttpPropagation() } override def receive(request: HttpMessage.Request): RequestHandler = { val incomingContext = if(settings.enableContextPropagation) _propagation.read(request) else Context.Empty val requestSpan = if(settings.enableTracing) buildServerSpan(incomingContext, request) else Span.Empty val handlerContext = if(requestSpan.nonEmpty()) incomingContext.withKey(Span.ContextKey, requestSpan) else incomingContext _metrics.foreach { httpServerMetrics => httpServerMetrics.activeRequests.increment() } new HttpServer.RequestHandler { override def context: Context = handlerContext override def span: Span = requestSpan override def doneReceiving(receivedBytes: Long): Unit = { _metrics.foreach { httpServerMetrics => httpServerMetrics.requestSize.record(receivedBytes) } } override def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse = { def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match { case TagMode.Metric => span.tagMetric(tag, value) case TagMode.Span => span.tag(tag, value) case TagMode.Off => } _metrics.foreach { httpServerMetrics => httpServerMetrics.countCompletedRequest(response.statusCode) } if(span.nonEmpty()) { settings.traceIDResponseHeader.foreach(traceIDHeader => response.write(traceIDHeader, span.context().traceID.string)) settings.spanIDResponseHeader.foreach(spanIDHeader => response.write(spanIDHeader, span.context().spanID.string)) } addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode) response.build() } override def doneSending(sentBytes: Long): Unit = { _metrics.foreach { httpServerMetrics => httpServerMetrics.activeRequests.decrement() httpServerMetrics.responseSize.record(sentBytes) } span.finish() } } } override def openConnection(): Unit = { _metrics.foreach { httpServerMetrics => httpServerMetrics.openConnections.increment() } } override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = { _metrics.foreach { httpServerMetrics => httpServerMetrics.openConnections.decrement() httpServerMetrics.connectionLifetime.record(lifetime.toNanos) httpServerMetrics.connectionUsage.record(handledRequests) } } override def shutdown(): Unit = { _metrics.foreach { httpServerMetrics => httpServerMetrics.cleanup() } } private def buildServerSpan(context: Context, request: HttpMessage.Request): Span = { val span = Kamon.buildSpan(operationName(request)) .withMetricTag("span.kind", "server") .withMetricTag("component", component) if(!settings.enableSpanMetrics) span.disableMetrics() for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } { val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID) if(identifier != IdentityProvider.NoIdentifier) span.withTraceID(identifier) } def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match { case TagMode.Metric => span.withMetricTag(tag, value) case TagMode.Span => span.withTag(tag, value) case TagMode.Off => } addRequestTag("http.url", request.url, settings.urlTagMode) addRequestTag("http.method", request.method, settings.urlTagMode) settings.contextTags.foreach { case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode)) } span.start() } private def operationName(request: HttpMessage.Request): String = { val requestPath = request.path val customMapping = settings.operationMappings.collectFirst { case (pattern, operationName) if pattern.accept(requestPath) => operationName } customMapping.getOrElse("http.request") } } case class Settings( enableContextPropagation: Boolean, propagationChannel: String, enableServerMetrics: Boolean, enableTracing: Boolean, traceIDTag: Option[String], enableSpanMetrics: Boolean, urlTagMode: TagMode, methodTagMode: TagMode, statusCodeTagMode: TagMode, contextTags: Map[String, TagMode], traceIDResponseHeader: Option[String], spanIDResponseHeader: Option[String], unhandledOperationName: String, operationMappings: Map[GlobPathFilter, String] ) object Settings { sealed trait TagMode object TagMode { case object Metric extends TagMode case object Span extends TagMode case object Off extends TagMode def from(value: String): TagMode = value.toLowerCase match { case "metric" => TagMode.Metric case "span" => TagMode.Span case _ => TagMode.Off } } def from(config: Config): Settings = { def optionalString(value: String): Option[String] = if(value.equalsIgnoreCase("none")) None else Some(value) // Context propagation settings val enablePropagation = config.getBoolean("propagation.enabled") val propagationChannel = config.getString("propagation.channel") // HTTP Server metrics settings val enableServerMetrics = config.getBoolean("metrics.enabled") // Tracing settings val enableTracing = config.getBoolean("tracing.enabled") val traceIdTag = Option(config.getString("tracing.preferred-trace-id-tag")).filterNot(_ == "none") val enableSpanMetrics = config.getBoolean("tracing.span-metrics") val urlTagMode = TagMode.from(config.getString("tracing.tags.url")) val methodTagMode = TagMode.from(config.getString("tracing.tags.method")) val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code")) val contextTags = config.getConfig("tracing.tags.from-context").pairs.map { case (tagName, mode) => (tagName, TagMode.from(mode)) } val traceIDResponseHeader = optionalString(config.getString("tracing.response-headers.trace-id")) val spanIDResponseHeader = optionalString(config.getString("tracing.response-headers.span-id")) val unhandledOperationName = config.getString("tracing.operations.unhandled") val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map { case (pattern, operationName) => (new GlobPathFilter(pattern), operationName) } Settings( enablePropagation, propagationChannel, enableServerMetrics, enableTracing, traceIdTag, enableSpanMetrics, urlTagMode, methodTagMode, statusCodeTagMode, contextTags, traceIDResponseHeader, spanIDResponseHeader, unhandledOperationName, operationMappings ) } } }