From 12d032dec55fbc65fb9db40e181a153f08639cf2 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Sun, 16 Sep 2018 19:34:31 +0200 Subject: implement HTTP server metrics tracking --- .../scala/kamon/instrumentation/HttpServer.scala | 176 +++++++++++++++++++-- 1 file changed, 164 insertions(+), 12 deletions(-) (limited to 'kamon-core') diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala index 88803bad..915118af 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -7,6 +7,7 @@ import com.typesafe.config.Config import kamon.context.Context import kamon.context.HttpPropagation.Direction import kamon.instrumentation.HttpServer.Settings.TagMode +import kamon.metric.MeasurementUnit.{time, information} import kamon.trace.{IdentityProvider, Span} import kamon.util.GlobPathFilter import org.slf4j.LoggerFactory @@ -60,6 +61,12 @@ trait HttpServer { */ def closeConnection(lifetime: Duration, handledRequests: Long): Unit + /** + * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances + * after calling this function is undefined. + */ + def shutdown(): Unit + } object HttpServer { @@ -110,17 +117,129 @@ object HttpServer { } - def from(name: String, port: Int, component: String): HttpServer = { - from(name, port, component, Kamon, Kamon) + /** + * Holds all metric instruments required to record metrics from an HTTP server. + * + * @param interface Interface name or address where the HTTP server is listening. + * @param port Port number where the HTTP server is listening. + */ + class Metrics(component: String, interface: String, port: Int) { + import Metrics._ + private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics]) + private val _statusCodeTag = "status_code" + private val _serverTags = Map( + "component" -> component, + "interface" -> interface, + "port" -> port.toString + ) + + val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx")) + val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx")) + val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx")) + val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx")) + val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx")) + + val activeRequests = ActiveRequests.refine(_serverTags) + val requestSize = RequestSize.refine(_serverTags) + val responseSize = ResponseSize.refine(_serverTags) + val connectionLifetime = ConnectionLifetime.refine(_serverTags) + val connectionUsage = ConnectionUsage.refine(_serverTags) + val openConnections = OpenConnections.refine(_serverTags) + + + def countCompletedRequest(statusCode: Int): Unit = { + if(statusCode >= 200 && statusCode <= 299) + requestsSuccessful.increment() + else if(statusCode >= 500 && statusCode <=599) + requestsServerError.increment() + else if(statusCode >= 400 && statusCode <=499) + requestsClientError.increment() + else if(statusCode >= 300 && statusCode <=399) + requestsRedirection.increment() + else if(statusCode >= 100 && statusCode <=199) + requestsInformational.increment() + else { + _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString) + } + } + + /** + * Removes all registered metrics from Kamon. + */ + def cleanup(): Unit = { + CompletedRequests.remove(statusCodeTag("1xx")) + CompletedRequests.remove(statusCodeTag("2xx")) + CompletedRequests.remove(statusCodeTag("3xx")) + CompletedRequests.remove(statusCodeTag("4xx")) + CompletedRequests.remove(statusCodeTag("5xx")) + + ActiveRequests.remove(_serverTags) + RequestSize.remove(_serverTags) + ResponseSize.remove(_serverTags) + ConnectionLifetime.remove(_serverTags) + ConnectionUsage.remove(_serverTags) + OpenConnections.remove(_serverTags) + } + + private def statusCodeTag(group: String): Map[String, String] = + _serverTags + (_statusCodeTag -> group) + } - def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { + + object Metrics { + + def of(component: String, interface: String, port: Int): Metrics = + new HttpServer.Metrics(component, interface, port) + + /** + * Number of completed requests per status code. + */ + val CompletedRequests = Kamon.counter("http.server.requests") + + /** + * Number of requests being processed simultaneously at any point in time. + */ + val ActiveRequests = Kamon.rangeSampler("http.server.request.active") + + /** + * Request size distribution (including headers and body) for all requests received by the server. + */ + val RequestSize = Kamon.histogram("http.server.request.size", information.bytes) + + /** + * Response size distribution (including headers and body) for all responses served by the server. + */ + val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes) + + /** + * Tracks the time elapsed between connection creation and connection close. + */ + val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds) + + /** + * Distribution of number of requests handled per connection during their entire lifetime. + */ + val ConnectionUsage = Kamon.histogram("http.server.connection.usage") + + /** + * Number of open connections. + */ + val OpenConnections = Kamon.rangeSampler("http.server.connection.open") + } + + + def from(name: String, component: String, interface: String, port: Int): HttpServer = { + from(name, component, interface, port, Kamon, Kamon) + } + + def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration) val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else { configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration) } - new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, port, component) + new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port) } val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server" @@ -128,7 +247,8 @@ object HttpServer { val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default" - private class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer { + private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None private val _log = LoggerFactory.getLogger(classOf[Default]) private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) .getOrElse { @@ -137,6 +257,7 @@ object HttpServer { } override def receive(request: HttpRequest): RequestHandler = { + val incomingContext = if(settings.enableContextPropagation) _propagation.readContext(request) else Context.Empty @@ -149,7 +270,9 @@ object HttpServer { incomingContext.withKey(Span.ContextKey, requestSpan) else incomingContext - // TODO: Handle HTTP Server Metrics + _metrics.foreach { httpServerMetrics => + httpServerMetrics.activeRequests.increment() + } new HttpServer.RequestHandler { @@ -159,7 +282,11 @@ object HttpServer { override def span: Span = requestSpan - override def doneReceiving(receivedBytes: Long): Unit = {} + override def doneReceiving(receivedBytes: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.requestSize.record(receivedBytes) + } + } override def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match { @@ -172,19 +299,44 @@ object HttpServer { _propagation.writeContext(context, response, Direction.Returning) } + _metrics.foreach { httpServerMetrics => + httpServerMetrics.countCompletedRequest(response.statusCode) + } + addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode) response.build() } override def doneSending(sentBytes: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.responseSize.record(sentBytes) + } + span.finish() } } } - override def openConnection(): Unit = ??? + override def openConnection(): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.openConnections.increment() + } + } + + override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.openConnections.decrement() + httpServerMetrics.connectionLifetime.record(lifetime.toNanos) + httpServerMetrics.connectionUsage.record(handledRequests) + } + } + + override def shutdown(): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.cleanup() + } + } - override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = ??? private def buildServerSpan(context: Context, request: HttpRequest): Span = { val span = Kamon.buildSpan(operationName(request)) @@ -218,9 +370,9 @@ object HttpServer { private def operationName(request: HttpRequest): String = { val requestPath = request.path - val customMapping = settings.operationMappings.find { - case (pattern, _) => pattern.accept(requestPath) - }.map(_._2) + val customMapping = settings.operationMappings.collectFirst { + case (pattern, operationName) if pattern.accept(requestPath) => operationName + } customMapping.getOrElse("http.request") } -- cgit v1.2.3