aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-09-16 19:34:31 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2018-09-16 19:34:31 +0200
commit12d032dec55fbc65fb9db40e181a153f08639cf2 (patch)
tree81b6d5b82713c792a17a43e7fed4093217a33795
parentac43c0476c239a9cf1c20e838b0fd212b20161e1 (diff)
downloadKamon-12d032dec55fbc65fb9db40e181a153f08639cf2.tar.gz
Kamon-12d032dec55fbc65fb9db40e181a153f08639cf2.tar.bz2
Kamon-12d032dec55fbc65fb9db40e181a153f08639cf2.zip
implement HTTP server metrics tracking
-rw-r--r--kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala176
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala2
3 files changed, 204 insertions, 21 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
index e31382a5..d469725f 100644
--- a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
@@ -1,12 +1,13 @@
package kamon.instrumentation
import kamon.context.Context
-import kamon.testkit.SpanInspection
+import kamon.metric.Counter
+import kamon.testkit.{MetricInspection, SpanInspection}
import org.scalatest.{Matchers, OptionValues, WordSpec}
import scala.collection.mutable
-class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues {
+class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues with MetricInspection {
"the HTTP server instrumentation" when {
"configured for context propagation" should {
@@ -73,7 +74,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
}
"not record span metrics when disabled" in {
- val handler = HttpServer.from("no-span-metrics", port = 8081, "http.server")
+ val handler = noSpanMetricsHttpServer()
.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
handler.send(fakeResponse(200, mutable.Map.empty), handler.context)
@@ -94,7 +95,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
"all capabilities are disabled" should {
"not read any context from the incoming requests" in {
- val httpServer = HttpServer.from("noop", port = 8081, "http.server")
+ val httpServer = noopHttpServer()
val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
"context-tags" -> "tag=value;none=0011223344556677;",
"custom-trace-id" -> "0011223344556677"
@@ -104,7 +105,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
}
"not create any span to represent the server request" in {
- val httpServer = HttpServer.from("noop", port = 8081, "http.server")
+ val httpServer = noopHttpServer()
val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
"context-tags" -> "tag=value;none=0011223344556677;",
"custom-trace-id" -> "0011223344556677"
@@ -113,12 +114,29 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
handler.span.isEmpty() shouldBe true
}
- "not record any HTTP server metrics" is (pending)
+ "not record any HTTP server metrics" in {
+ val request = fakeRequest("http://localhost:8080/", "/", "GET", Map.empty)
+ noopHttpServer().receive(request).send(fakeResponse(200, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(302, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(404, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(504, mutable.Map.empty), Context.Empty)
+ noopHttpServer().receive(request).send(fakeResponse(110, mutable.Map.empty), Context.Empty)
+
+ completedRequests(8083, 100).value() shouldBe 0L
+ completedRequests(8083, 200).value() shouldBe 0L
+ completedRequests(8083, 300).value() shouldBe 0L
+ completedRequests(8083, 400).value() shouldBe 0L
+ completedRequests(8083, 500).value() shouldBe 0L
+ }
}
}
- def httpServer(): HttpServer = HttpServer.from("default", port = 8081, "http.server")
- def noopHttpServer(): HttpServer = HttpServer.from("noop", port = 8081, "http.server")
+ val TestComponent = "http.server"
+ val TestInterface = "0.0.0.0"
+
+ def httpServer(): HttpServer = HttpServer.from("default", component = TestComponent, interface = TestInterface, port = 8081)
+ def noSpanMetricsHttpServer(): HttpServer = HttpServer.from("no-span-metrics", component = TestComponent, interface = TestInterface, port = 8082)
+ def noopHttpServer(): HttpServer = HttpServer.from("noop", component = TestComponent, interface = TestInterface, port = 8083)
def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpRequest =
new HttpRequest {
@@ -134,4 +152,17 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
override def writeHeader(header: String, value: String): Unit = headers.put(header, value)
override def build(): HttpResponse = this
}
+
+ def completedRequests(port: Int, statusCode: Int): Counter = {
+ val metrics = HttpServer.Metrics.of(TestComponent, TestInterface, port)
+
+ statusCode match {
+ case sc if sc >= 100 && sc <= 199 => metrics.requestsInformational
+ case sc if sc >= 200 && sc <= 299 => metrics.requestsSuccessful
+ case sc if sc >= 300 && sc <= 399 => metrics.requestsRedirection
+ case sc if sc >= 400 && sc <= 499 => metrics.requestsClientError
+ case sc if sc >= 500 && sc <= 599 => metrics.requestsServerError
+ }
+ }
+
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
index 88803bad..915118af 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -7,6 +7,7 @@ import com.typesafe.config.Config
import kamon.context.Context
import kamon.context.HttpPropagation.Direction
import kamon.instrumentation.HttpServer.Settings.TagMode
+import kamon.metric.MeasurementUnit.{time, information}
import kamon.trace.{IdentityProvider, Span}
import kamon.util.GlobPathFilter
import org.slf4j.LoggerFactory
@@ -60,6 +61,12 @@ trait HttpServer {
*/
def closeConnection(lifetime: Duration, handledRequests: Long): Unit
+ /**
+ * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances
+ * after calling this function is undefined.
+ */
+ def shutdown(): Unit
+
}
object HttpServer {
@@ -110,17 +117,129 @@ object HttpServer {
}
- def from(name: String, port: Int, component: String): HttpServer = {
- from(name, port, component, Kamon, Kamon)
+ /**
+ * Holds all metric instruments required to record metrics from an HTTP server.
+ *
+ * @param interface Interface name or address where the HTTP server is listening.
+ * @param port Port number where the HTTP server is listening.
+ */
+ class Metrics(component: String, interface: String, port: Int) {
+ import Metrics._
+ private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics])
+ private val _statusCodeTag = "status_code"
+ private val _serverTags = Map(
+ "component" -> component,
+ "interface" -> interface,
+ "port" -> port.toString
+ )
+
+ val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx"))
+ val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx"))
+ val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx"))
+ val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx"))
+ val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx"))
+
+ val activeRequests = ActiveRequests.refine(_serverTags)
+ val requestSize = RequestSize.refine(_serverTags)
+ val responseSize = ResponseSize.refine(_serverTags)
+ val connectionLifetime = ConnectionLifetime.refine(_serverTags)
+ val connectionUsage = ConnectionUsage.refine(_serverTags)
+ val openConnections = OpenConnections.refine(_serverTags)
+
+
+ def countCompletedRequest(statusCode: Int): Unit = {
+ if(statusCode >= 200 && statusCode <= 299)
+ requestsSuccessful.increment()
+ else if(statusCode >= 500 && statusCode <=599)
+ requestsServerError.increment()
+ else if(statusCode >= 400 && statusCode <=499)
+ requestsClientError.increment()
+ else if(statusCode >= 300 && statusCode <=399)
+ requestsRedirection.increment()
+ else if(statusCode >= 100 && statusCode <=199)
+ requestsInformational.increment()
+ else {
+ _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString)
+ }
+ }
+
+ /**
+ * Removes all registered metrics from Kamon.
+ */
+ def cleanup(): Unit = {
+ CompletedRequests.remove(statusCodeTag("1xx"))
+ CompletedRequests.remove(statusCodeTag("2xx"))
+ CompletedRequests.remove(statusCodeTag("3xx"))
+ CompletedRequests.remove(statusCodeTag("4xx"))
+ CompletedRequests.remove(statusCodeTag("5xx"))
+
+ ActiveRequests.remove(_serverTags)
+ RequestSize.remove(_serverTags)
+ ResponseSize.remove(_serverTags)
+ ConnectionLifetime.remove(_serverTags)
+ ConnectionUsage.remove(_serverTags)
+ OpenConnections.remove(_serverTags)
+ }
+
+ private def statusCodeTag(group: String): Map[String, String] =
+ _serverTags + (_statusCodeTag -> group)
+
}
- def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
+
+ object Metrics {
+
+ def of(component: String, interface: String, port: Int): Metrics =
+ new HttpServer.Metrics(component, interface, port)
+
+ /**
+ * Number of completed requests per status code.
+ */
+ val CompletedRequests = Kamon.counter("http.server.requests")
+
+ /**
+ * Number of requests being processed simultaneously at any point in time.
+ */
+ val ActiveRequests = Kamon.rangeSampler("http.server.request.active")
+
+ /**
+ * Request size distribution (including headers and body) for all requests received by the server.
+ */
+ val RequestSize = Kamon.histogram("http.server.request.size", information.bytes)
+
+ /**
+ * Response size distribution (including headers and body) for all responses served by the server.
+ */
+ val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes)
+
+ /**
+ * Tracks the time elapsed between connection creation and connection close.
+ */
+ val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds)
+
+ /**
+ * Distribution of number of requests handled per connection during their entire lifetime.
+ */
+ val ConnectionUsage = Kamon.histogram("http.server.connection.usage")
+
+ /**
+ * Number of open connections.
+ */
+ val OpenConnections = Kamon.rangeSampler("http.server.connection.open")
+ }
+
+
+ def from(name: String, component: String, interface: String, port: Int): HttpServer = {
+ from(name, component, interface, port, Kamon, Kamon)
+ }
+
+ def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration)
val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else {
configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration)
}
- new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, port, component)
+ new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port)
}
val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server"
@@ -128,7 +247,8 @@ object HttpServer {
val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default"
- private class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer {
+ private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer {
+ private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None
private val _log = LoggerFactory.getLogger(classOf[Default])
private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
.getOrElse {
@@ -137,6 +257,7 @@ object HttpServer {
}
override def receive(request: HttpRequest): RequestHandler = {
+
val incomingContext = if(settings.enableContextPropagation)
_propagation.readContext(request)
else Context.Empty
@@ -149,7 +270,9 @@ object HttpServer {
incomingContext.withKey(Span.ContextKey, requestSpan)
else incomingContext
- // TODO: Handle HTTP Server Metrics
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.activeRequests.increment()
+ }
new HttpServer.RequestHandler {
@@ -159,7 +282,11 @@ object HttpServer {
override def span: Span =
requestSpan
- override def doneReceiving(receivedBytes: Long): Unit = {}
+ override def doneReceiving(receivedBytes: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.requestSize.record(receivedBytes)
+ }
+ }
override def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = {
def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match {
@@ -172,19 +299,44 @@ object HttpServer {
_propagation.writeContext(context, response, Direction.Returning)
}
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.countCompletedRequest(response.statusCode)
+ }
+
addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode)
response.build()
}
override def doneSending(sentBytes: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.responseSize.record(sentBytes)
+ }
+
span.finish()
}
}
}
- override def openConnection(): Unit = ???
+ override def openConnection(): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.openConnections.increment()
+ }
+ }
+
+ override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.openConnections.decrement()
+ httpServerMetrics.connectionLifetime.record(lifetime.toNanos)
+ httpServerMetrics.connectionUsage.record(handledRequests)
+ }
+ }
+
+ override def shutdown(): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.cleanup()
+ }
+ }
- override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = ???
private def buildServerSpan(context: Context, request: HttpRequest): Span = {
val span = Kamon.buildSpan(operationName(request))
@@ -218,9 +370,9 @@ object HttpServer {
private def operationName(request: HttpRequest): String = {
val requestPath = request.path
- val customMapping = settings.operationMappings.find {
- case (pattern, _) => pattern.accept(requestPath)
- }.map(_._2)
+ val customMapping = settings.operationMappings.collectFirst {
+ case (pattern, operationName) if pattern.accept(requestPath) => operationName
+ }
customMapping.getOrElse("http.request")
}
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
index e45124b0..c28ace64 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
@@ -58,7 +58,7 @@ object SpanInspection {
case TagValue.Number(number) => number.toString
case TagValue.True => "true"
case TagValue.False => "false"
- }
+ } orElse(metricTag(key))
}
def metricTags(): Map[String, String] =