package kamon.instrumentation import java.time.Duration import kamon.context.Context import kamon.metric.{Counter, Histogram, RangeSampler} import kamon.testkit.{MetricInspection, SpanInspection} import org.scalatest.concurrent.Eventually import org.scalatest.{Matchers, OptionValues, WordSpec} import scala.collection.mutable class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues with MetricInspection with Eventually { "the HTTP server instrumentation" when { "configured for context propagation" should { "read context entries and tags from the incoming request" in { val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) handler.context.tags should contain only( "tag" -> "value", "none" -> "0011223344556677" ) handler.send(fakeResponse(200, mutable.Map.empty), Context.Empty) handler.doneSending(0L) } "use the configured HTTP propagation channel" in { val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) handler.context.tags should contain only( "tag" -> "value", "none" -> "0011223344556677" ) val span = inspect(handler.span) span.context().traceID.string shouldNot be("0011223344556677") span.tag("http.method").value shouldBe "GET" span.tag("http.url").value shouldBe "http://localhost:8080/" val responseHeaders = mutable.Map.empty[String, String] handler.send(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world")) handler.doneSending(0L) } } "configured for HTTP server metrics" should { "track the number of open connections" in { openConnections(8081).distribution() httpServer().openConnection() httpServer().openConnection() val snapshotWithOpenConnections = openConnections(8081).distribution() snapshotWithOpenConnections.min shouldBe 0 snapshotWithOpenConnections.max shouldBe 2 httpServer().closeConnection(Duration.ofSeconds(20), 10) httpServer().closeConnection(Duration.ofSeconds(30), 15) eventually { val snapshotWithoutOpenConnections = openConnections(8081).distribution() snapshotWithoutOpenConnections.min shouldBe 0 snapshotWithoutOpenConnections.max shouldBe 0 } } "track the distribution of number of requests handled per each connection" in { connectionUsage(8081).distribution() httpServer().openConnection() httpServer().openConnection() httpServer().closeConnection(Duration.ofSeconds(20), 10) httpServer().closeConnection(Duration.ofSeconds(30), 15) val connectionUsageSnapshot = connectionUsage(8081).distribution() connectionUsageSnapshot.buckets.map(_.value) should contain allOf( 10, 15 ) } "track the distribution of connection lifetime across all connections" in { connectionLifetime(8081).distribution() httpServer().openConnection() httpServer().openConnection() httpServer().closeConnection(Duration.ofSeconds(20), 10) httpServer().closeConnection(Duration.ofSeconds(30), 15) val connectionLifetimeSnapshot = connectionLifetime(8081).distribution() connectionLifetimeSnapshot.buckets.map(_.value) should contain allOf( 19998441472L, // 20 seconds with 1% precision 29930553344L // 30 seconds with 1% precision ) } "track the number of active requests" in { activeRequests(8081).distribution() val handlerOne = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) val handlerTwo = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) val snapshotWithActiveRequests = activeRequests(8081).distribution() snapshotWithActiveRequests.min shouldBe 0 snapshotWithActiveRequests.max shouldBe 2 handlerOne.send(fakeResponse(200, mutable.Map.empty), Context.Empty) handlerTwo.send(fakeResponse(200, mutable.Map.empty), Context.Empty) handlerOne.doneSending(0L) handlerTwo.doneSending(0L) eventually { val snapshotWithoutActiveRequests = activeRequests(8081).distribution() snapshotWithoutActiveRequests.min shouldBe 0 snapshotWithoutActiveRequests.max shouldBe 0 } } "track the distribution of sizes on incoming requests" in { requestSize(8081).distribution() httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneReceiving(300) httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneReceiving(400) val requestSizeSnapshot = requestSize(8081).distribution() requestSizeSnapshot.buckets.map(_.value) should contain allOf( 300, 400 ) } "track the distribution of sizes on outgoing responses" in { responseSize(8081).distribution() httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneSending(300) httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)).doneSending(400) val requestSizeSnapshot = responseSize(8081).distribution() requestSizeSnapshot.buckets.map(_.value) should contain allOf( 300, 400 ) } "track the number of responses per status code" in { // resets all counters completedRequests(8081, 100).value() completedRequests(8081, 200).value() completedRequests(8081, 300).value() completedRequests(8081, 400).value() completedRequests(8081, 500).value() val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty) httpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty) httpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty) httpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty) httpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty) httpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty) completedRequests(8081, 100).value() shouldBe 1L completedRequests(8081, 200).value() shouldBe 1L completedRequests(8081, 300).value() shouldBe 1L completedRequests(8081, 400).value() shouldBe 1L completedRequests(8081, 500).value() shouldBe 1L } } "configured for distributed tracing" should { "create a span representing the current HTTP operation" in { val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) handler.send(fakeResponse(200, mutable.Map.empty), handler.context) val span = inspect(handler.span) span.tag("http.method").value shouldBe "GET" span.tag("http.url").value shouldBe "http://localhost:8080/" span.tag("http.status_code").value shouldBe "200" } "adopt a traceID when explicitly provided" in { val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "x-correlation-id" -> "0011223344556677" ))) handler.span.context().traceID.string shouldBe "0011223344556677" } "record span metrics when enabled" in { val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) handler.send(fakeResponse(200, mutable.Map.empty), handler.context) val span = inspect(handler.span) span.hasMetricsEnabled() shouldBe true } "not record span metrics when disabled" in { val handler = noSpanMetricsHttpServer() .receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)) handler.send(fakeResponse(200, mutable.Map.empty), handler.context) val span = inspect(handler.span) span.hasMetricsEnabled() shouldBe false } "receive tags from context when available" in { val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;peer=superservice;", "custom-trace-id" -> "0011223344556677" ))) val span = inspect(handler.span) span.tag("peer").value shouldBe "superservice" } "write trace identifiers on the responses" in { val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "x-correlation-id" -> "0011223344556677" ))) val responseHeaders = mutable.Map.empty[String, String] handler.send(fakeResponse(200, responseHeaders), handler.context) responseHeaders.get("x-trace-id").value shouldBe "0011223344556677" responseHeaders.get("x-span-id") shouldBe defined } } "all capabilities are disabled" should { "not read any context from the incoming requests" in { val httpServer = noopHttpServer() val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) handler.context shouldBe Context.Empty } "not create any span to represent the server request" in { val httpServer = noopHttpServer() val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map( "context-tags" -> "tag=value;none=0011223344556677;", "custom-trace-id" -> "0011223344556677" ))) handler.span.isEmpty() shouldBe true } "not record any HTTP server metrics" in { val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty) noopHttpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty) noopHttpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty) noopHttpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty) noopHttpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty) noopHttpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty) completedRequests(8083, 100).value() shouldBe 0L completedRequests(8083, 200).value() shouldBe 0L completedRequests(8083, 300).value() shouldBe 0L completedRequests(8083, 400).value() shouldBe 0L completedRequests(8083, 500).value() shouldBe 0L } } } val TestComponent = "http.server" val TestInterface = "0.0.0.0" def httpServer(): HttpServer = HttpServer.from("default", component = TestComponent, interface = TestInterface, port = 8081) def noSpanMetricsHttpServer(): HttpServer = HttpServer.from("no-span-metrics", component = TestComponent, interface = TestInterface, port = 8082) def noopHttpServer(): HttpServer = HttpServer.from("noop", component = TestComponent, interface = TestInterface, port = 8083) def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpMessage.Request = new HttpMessage.Request { override def url: String = requestUrl override def path: String = requestPath override def method: String = requestMethod override def read(header: String): Option[String] = headers.get(header) override def readAll(): Map[String, String] = headers } def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpMessage.ResponseBuilder[HttpMessage.Response] = new HttpMessage.ResponseBuilder[HttpMessage.Response] { override def statusCode: Int = responseStatusCode override def write(header: String, value: String): Unit = headers.put(header, value) override def build(): HttpMessage.Response = this } def completedRequests(port: Int, statusCode: Int): Counter = { val metrics = HttpServer.Metrics.of(TestComponent, TestInterface, port) statusCode match { case sc if sc >= 100 && sc <= 199 => metrics.requestsInformational case sc if sc >= 200 && sc <= 299 => metrics.requestsSuccessful case sc if sc >= 300 && sc <= 399 => metrics.requestsRedirection case sc if sc >= 400 && sc <= 499 => metrics.requestsClientError case sc if sc >= 500 && sc <= 599 => metrics.requestsServerError } } def openConnections(port: Int): RangeSampler = HttpServer.Metrics.of(TestComponent, TestInterface, port).openConnections def connectionUsage(port: Int): Histogram = HttpServer.Metrics.of(TestComponent, TestInterface, port).connectionUsage def connectionLifetime(port: Int): Histogram = HttpServer.Metrics.of(TestComponent, TestInterface, port).connectionLifetime def activeRequests(port: Int): RangeSampler = HttpServer.Metrics.of(TestComponent, TestInterface, port).activeRequests def requestSize(port: Int): Histogram = HttpServer.Metrics.of(TestComponent, TestInterface, port).requestSize def responseSize(port: Int): Histogram = HttpServer.Metrics.of(TestComponent, TestInterface, port).responseSize }