diff options
3 files changed, 204 insertions, 21 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala index e31382a5..d469725f 100644 --- a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala +++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala @@ -1,12 +1,13 @@ package kamon.instrumentation import kamon.context.Context -import kamon.testkit.SpanInspection +import kamon.metric.Counter +import kamon.testkit.{MetricInspection, SpanInspection} import org.scalatest.{Matchers, OptionValues, WordSpec} import scala.collection.mutable -class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues { +class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues with MetricInspection { "the HTTP server instrumentation" when { "configured for context propagation" should { @@ -73,7 +74,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp } "not record span metrics when disabled" in { - val handler = HttpServer.from("no-span-metrics", port = 8081, "http.server") + val handler = noSpanMetricsHttpServer() .receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) handler.send(fakeResponse(200, mutable.Map.empty), handler.context) @@ -94,7 +95,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp "all capabilities are disabled" should { "not read any context from the incoming requests" in { - val httpServer = HttpServer.from("noop", port = 8081, "http.server") + val httpServer = noopHttpServer() val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" @@ -104,7 +105,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp } "not create any span to represent the server request" in { - val httpServer = HttpServer.from("noop", port = 8081, "http.server") + val httpServer = noopHttpServer() val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" @@ -113,12 +114,29 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp handler.span.isEmpty() shouldBe true } - "not record any HTTP server metrics" is (pending) + "not record any HTTP server metrics" in { + val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty) + noopHttpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty) + noopHttpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty) + + completedRequests(8083, 100).value() shouldBe 0L + completedRequests(8083, 200).value() shouldBe 0L + completedRequests(8083, 300).value() shouldBe 0L + completedRequests(8083, 400).value() shouldBe 0L + completedRequests(8083, 500).value() shouldBe 0L + } } } - def httpServer(): HttpServer = HttpServer.from("default", port = 8081, "http.server") - def noopHttpServer(): HttpServer = HttpServer.from("noop", port = 8081, "http.server") + val TestComponent = "http.server" + val TestInterface = "0.0.0.0" + + def httpServer(): HttpServer = HttpServer.from("default", component = TestComponent, interface = TestInterface, port = 8081) + def noSpanMetricsHttpServer(): HttpServer = HttpServer.from("no-span-metrics", component = TestComponent, interface = TestInterface, port = 8082) + def noopHttpServer(): HttpServer = HttpServer.from("noop", component = TestComponent, interface = TestInterface, port = 8083) def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpRequest = new HttpRequest { @@ -134,4 +152,17 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp override def writeHeader(header: String, value: String): Unit = headers.put(header, value) override def build(): HttpResponse = this } + + def completedRequests(port: Int, statusCode: Int): Counter = { + val metrics = HttpServer.Metrics.of(TestComponent, TestInterface, port) + + statusCode match { + case sc if sc >= 100 && sc <= 199 => metrics.requestsInformational + case sc if sc >= 200 && sc <= 299 => metrics.requestsSuccessful + case sc if sc >= 300 && sc <= 399 => metrics.requestsRedirection + case sc if sc >= 400 && sc <= 499 => metrics.requestsClientError + case sc if sc >= 500 && sc <= 599 => metrics.requestsServerError + } + } + } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala index 88803bad..915118af 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala @@ -7,6 +7,7 @@ import com.typesafe.config.Config import kamon.context.Context import kamon.context.HttpPropagation.Direction import kamon.instrumentation.HttpServer.Settings.TagMode +import kamon.metric.MeasurementUnit.{time, information} import kamon.trace.{IdentityProvider, Span} import kamon.util.GlobPathFilter import org.slf4j.LoggerFactory @@ -60,6 +61,12 @@ trait HttpServer { */ def closeConnection(lifetime: Duration, handledRequests: Long): Unit + /** + * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances + * after calling this function is undefined. + */ + def shutdown(): Unit + } object HttpServer { @@ -110,17 +117,129 @@ object HttpServer { } - def from(name: String, port: Int, component: String): HttpServer = { - from(name, port, component, Kamon, Kamon) + /** + * Holds all metric instruments required to record metrics from an HTTP server. + * + * @param interface Interface name or address where the HTTP server is listening. + * @param port Port number where the HTTP server is listening. + */ + class Metrics(component: String, interface: String, port: Int) { + import Metrics._ + private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics]) + private val _statusCodeTag = "status_code" + private val _serverTags = Map( + "component" -> component, + "interface" -> interface, + "port" -> port.toString + ) + + val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx")) + val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx")) + val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx")) + val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx")) + val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx")) + + val activeRequests = ActiveRequests.refine(_serverTags) + val requestSize = RequestSize.refine(_serverTags) + val responseSize = ResponseSize.refine(_serverTags) + val connectionLifetime = ConnectionLifetime.refine(_serverTags) + val connectionUsage = ConnectionUsage.refine(_serverTags) + val openConnections = OpenConnections.refine(_serverTags) + + + def countCompletedRequest(statusCode: Int): Unit = { + if(statusCode >= 200 && statusCode <= 299) + requestsSuccessful.increment() + else if(statusCode >= 500 && statusCode <=599) + requestsServerError.increment() + else if(statusCode >= 400 && statusCode <=499) + requestsClientError.increment() + else if(statusCode >= 300 && statusCode <=399) + requestsRedirection.increment() + else if(statusCode >= 100 && statusCode <=199) + requestsInformational.increment() + else { + _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString) + } + } + + /** + * Removes all registered metrics from Kamon. + */ + def cleanup(): Unit = { + CompletedRequests.remove(statusCodeTag("1xx")) + CompletedRequests.remove(statusCodeTag("2xx")) + CompletedRequests.remove(statusCodeTag("3xx")) + CompletedRequests.remove(statusCodeTag("4xx")) + CompletedRequests.remove(statusCodeTag("5xx")) + + ActiveRequests.remove(_serverTags) + RequestSize.remove(_serverTags) + ResponseSize.remove(_serverTags) + ConnectionLifetime.remove(_serverTags) + ConnectionUsage.remove(_serverTags) + OpenConnections.remove(_serverTags) + } + + private def statusCodeTag(group: String): Map[String, String] = + _serverTags + (_statusCodeTag -> group) + } - def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { + + object Metrics { + + def of(component: String, interface: String, port: Int): Metrics = + new HttpServer.Metrics(component, interface, port) + + /** + * Number of completed requests per status code. + */ + val CompletedRequests = Kamon.counter("http.server.requests") + + /** + * Number of requests being processed simultaneously at any point in time. + */ + val ActiveRequests = Kamon.rangeSampler("http.server.request.active") + + /** + * Request size distribution (including headers and body) for all requests received by the server. + */ + val RequestSize = Kamon.histogram("http.server.request.size", information.bytes) + + /** + * Response size distribution (including headers and body) for all responses served by the server. + */ + val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes) + + /** + * Tracks the time elapsed between connection creation and connection close. + */ + val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds) + + /** + * Distribution of number of requests handled per connection during their entire lifetime. + */ + val ConnectionUsage = Kamon.histogram("http.server.connection.usage") + + /** + * Number of open connections. + */ + val OpenConnections = Kamon.rangeSampler("http.server.connection.open") + } + + + def from(name: String, component: String, interface: String, port: Int): HttpServer = { + from(name, component, interface, port, Kamon, Kamon) + } + + def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = { val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration) val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else { configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration) } - new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, port, component) + new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port) } val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server" @@ -128,7 +247,8 @@ object HttpServer { val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default" - private class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer { + private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer { + private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None private val _log = LoggerFactory.getLogger(classOf[Default]) private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel) .getOrElse { @@ -137,6 +257,7 @@ object HttpServer { } override def receive(request: HttpRequest): RequestHandler = { + val incomingContext = if(settings.enableContextPropagation) _propagation.readContext(request) else Context.Empty @@ -149,7 +270,9 @@ object HttpServer { incomingContext.withKey(Span.ContextKey, requestSpan) else incomingContext - // TODO: Handle HTTP Server Metrics + _metrics.foreach { httpServerMetrics => + httpServerMetrics.activeRequests.increment() + } new HttpServer.RequestHandler { @@ -159,7 +282,11 @@ object HttpServer { override def span: Span = requestSpan - override def doneReceiving(receivedBytes: Long): Unit = {} + override def doneReceiving(receivedBytes: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.requestSize.record(receivedBytes) + } + } override def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = { def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match { @@ -172,19 +299,44 @@ object HttpServer { _propagation.writeContext(context, response, Direction.Returning) } + _metrics.foreach { httpServerMetrics => + httpServerMetrics.countCompletedRequest(response.statusCode) + } + addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode) response.build() } override def doneSending(sentBytes: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.responseSize.record(sentBytes) + } + span.finish() } } } - override def openConnection(): Unit = ??? + override def openConnection(): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.openConnections.increment() + } + } + + override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.openConnections.decrement() + httpServerMetrics.connectionLifetime.record(lifetime.toNanos) + httpServerMetrics.connectionUsage.record(handledRequests) + } + } + + override def shutdown(): Unit = { + _metrics.foreach { httpServerMetrics => + httpServerMetrics.cleanup() + } + } - override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = ??? private def buildServerSpan(context: Context, request: HttpRequest): Span = { val span = Kamon.buildSpan(operationName(request)) @@ -218,9 +370,9 @@ object HttpServer { private def operationName(request: HttpRequest): String = { val requestPath = request.path - val customMapping = settings.operationMappings.find { - case (pattern, _) => pattern.accept(requestPath) - }.map(_._2) + val customMapping = settings.operationMappings.collectFirst { + case (pattern, operationName) if pattern.accept(requestPath) => operationName + } customMapping.getOrElse("http.request") } diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala index e45124b0..c28ace64 100644 --- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala +++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala @@ -58,7 +58,7 @@ object SpanInspection { case TagValue.Number(number) => number.toString case TagValue.True => "true" case TagValue.False => "false" - } + } orElse(metricTag(key)) } def metricTags(): Map[String, String] = |