aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/instrumentation')
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala66
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala453
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala36
3 files changed, 555 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
new file mode 100644
index 00000000..b141331b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
@@ -0,0 +1,66 @@
+package kamon.instrumentation
+
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+
+/**
+ * Base abstractions over HTTP messages.
+ */
+object HttpMessage {
+
+ /**
+ * Wrapper for HTTP Request messages.
+ */
+ trait Request extends HeaderReader {
+
+ /**
+ * Request URL.
+ */
+ def url: String
+
+ /**
+ * Full request path. Does not include the query.
+ */
+ def path: String
+
+ /**
+ * HTTP Method.
+ */
+ def method: String
+ }
+
+ /**
+ * Wrapper for HTTP response messages.
+ */
+ trait Response {
+
+ /**
+ * Status code on the response message.
+ */
+ def statusCode: Int
+ }
+
+ /**
+ * A HTTP message builder on which header values can be written and a complete HTTP message can be build from.
+ * Implementations will typically wrap a HTTP message model from an instrumented framework and either accumulate
+ * all header writes until a call to to .build() is made and a new HTTP message is constructed merging the previous
+ * and accumulated headers (on immutable HTTP models) or directly write the headers on the underlying HTTP message
+ * (on mutable HTTP models).
+ */
+ trait Builder[Message] extends HeaderWriter {
+
+ /**
+ * Returns a version a version of the HTTP message container all headers that have been written to the builder.
+ */
+ def build(): Message
+ }
+
+ /**
+ * Builder for HTTP Request messages.
+ */
+ trait RequestBuilder[Message] extends Request with Builder[Message]
+
+ /**
+ * Builder for HTTP Response messages.
+ */
+ trait ResponseBuilder[Message] extends Response with Builder[Message]
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
new file mode 100644
index 00000000..72828424
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -0,0 +1,453 @@
+package kamon
+package instrumentation
+
+import java.time.Duration
+
+import com.typesafe.config.Config
+import kamon.context.Context
+import kamon.instrumentation.HttpServer.Settings.TagMode
+import kamon.metric.MeasurementUnit.{time, information}
+import kamon.trace.{IdentityProvider, Span}
+import kamon.util.GlobPathFilter
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server
+ * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All
+ * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default".
+ *
+ * The default implementation shipping with Kamon provides:
+ *
+ * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context
+ * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server.
+ * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric
+ * tags from the incoming requests as well as form the incoming context tags.
+ * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code
+ * counts in the HTTP server.
+ *
+ */
+trait HttpServer {
+
+ /**
+ * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that
+ * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from
+ * HTTP headers (if context propagation is enabled).
+ *
+ * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked
+ * for all incoming requests.
+ *
+ * @param request A HttpRequest wrapper on the original incoming HTTP request.
+ * @return The RequestHandler that will follow the lifecycle of the incoming request.
+ */
+ def receive(request: HttpMessage.Request): HttpServer.RequestHandler
+
+
+ /**
+ * Signals that a new HTTP connection has been opened.
+ */
+ def openConnection(): Unit
+
+
+ /**
+ * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests
+ * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will
+ * be updated when the values are zero.
+ *
+ * @param lifetime For how long did the connection remain open.
+ * @param handledRequests How many requests where handled by the closed connection.
+ */
+ def closeConnection(lifetime: Duration, handledRequests: Long): Unit
+
+ /**
+ * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances
+ * after calling this function is undefined.
+ */
+ def shutdown(): Unit
+
+}
+
+object HttpServer {
+
+ /**
+ * Handler associated to the processing of a single request. The instrumentation code using this class is responsible
+ * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the
+ * doneReceiving, send and doneSending callbacks when appropriate.
+ */
+ trait RequestHandler {
+
+ /**
+ * If context propagation is enabled this function returns the incoming context associated wih this request,
+ * otherwise [[Context.Empty]] is returned.
+ */
+ def context: Context
+
+ /**
+ * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span.
+ */
+ def span: Span
+
+ /**
+ * Signals that the entire request (headers and body) has been received.
+ *
+ * @param receivedBytes Size of the entire HTTP request.
+ */
+ def doneReceiving(receivedBytes: Long): Unit
+
+ /**
+ * Process a response to be sent back to the client. Since returning keys might need to included in the response
+ * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one
+ * passed into this function.
+ *
+ * @param response Wraps the HTTP response to be sent back to the client.
+ * @param context Context that should be used for writing returning keys into the response.
+ * @return The modified HTTP response that should be sent to clients.
+ */
+ def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse
+
+ /**
+ * Signals that the entire response (headers and body) has been sent to the client.
+ *
+ * @param sentBytes Size of the entire HTTP response.
+ */
+ def doneSending(sentBytes: Long): Unit
+
+ }
+
+
+ /**
+ * Holds all metric instruments required to record metrics from an HTTP server.
+ *
+ * @param interface Interface name or address where the HTTP server is listening.
+ * @param port Port number where the HTTP server is listening.
+ */
+ class Metrics(component: String, interface: String, port: Int) {
+ import Metrics._
+ private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics])
+ private val _statusCodeTag = "status_code"
+ private val _serverTags = Map(
+ "component" -> component,
+ "interface" -> interface,
+ "port" -> port.toString
+ )
+
+ val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx"))
+ val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx"))
+ val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx"))
+ val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx"))
+ val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx"))
+
+ val activeRequests = ActiveRequests.refine(_serverTags)
+ val requestSize = RequestSize.refine(_serverTags)
+ val responseSize = ResponseSize.refine(_serverTags)
+ val connectionLifetime = ConnectionLifetime.refine(_serverTags)
+ val connectionUsage = ConnectionUsage.refine(_serverTags)
+ val openConnections = OpenConnections.refine(_serverTags)
+
+
+ def countCompletedRequest(statusCode: Int): Unit = {
+ if(statusCode >= 200 && statusCode <= 299)
+ requestsSuccessful.increment()
+ else if(statusCode >= 500 && statusCode <=599)
+ requestsServerError.increment()
+ else if(statusCode >= 400 && statusCode <=499)
+ requestsClientError.increment()
+ else if(statusCode >= 300 && statusCode <=399)
+ requestsRedirection.increment()
+ else if(statusCode >= 100 && statusCode <=199)
+ requestsInformational.increment()
+ else {
+ _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString)
+ }
+ }
+
+ /**
+ * Removes all registered metrics from Kamon.
+ */
+ def cleanup(): Unit = {
+ CompletedRequests.remove(statusCodeTag("1xx"))
+ CompletedRequests.remove(statusCodeTag("2xx"))
+ CompletedRequests.remove(statusCodeTag("3xx"))
+ CompletedRequests.remove(statusCodeTag("4xx"))
+ CompletedRequests.remove(statusCodeTag("5xx"))
+
+ ActiveRequests.remove(_serverTags)
+ RequestSize.remove(_serverTags)
+ ResponseSize.remove(_serverTags)
+ ConnectionLifetime.remove(_serverTags)
+ ConnectionUsage.remove(_serverTags)
+ OpenConnections.remove(_serverTags)
+ }
+
+ private def statusCodeTag(group: String): Map[String, String] =
+ _serverTags + (_statusCodeTag -> group)
+
+ }
+
+
+ object Metrics {
+
+ def of(component: String, interface: String, port: Int): Metrics =
+ new HttpServer.Metrics(component, interface, port)
+
+ /**
+ * Number of completed requests per status code.
+ */
+ val CompletedRequests = Kamon.counter("http.server.requests")
+
+ /**
+ * Number of requests being processed simultaneously at any point in time.
+ */
+ val ActiveRequests = Kamon.rangeSampler("http.server.request.active")
+
+ /**
+ * Request size distribution (including headers and body) for all requests received by the server.
+ */
+ val RequestSize = Kamon.histogram("http.server.request.size", information.bytes)
+
+ /**
+ * Response size distribution (including headers and body) for all responses served by the server.
+ */
+ val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes)
+
+ /**
+ * Tracks the time elapsed between connection creation and connection close.
+ */
+ val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds)
+
+ /**
+ * Distribution of number of requests handled per connection during their entire lifetime.
+ */
+ val ConnectionUsage = Kamon.histogram("http.server.connection.usage")
+
+ /**
+ * Number of open connections.
+ */
+ val OpenConnections = Kamon.rangeSampler("http.server.connection.open")
+ }
+
+
+ def from(name: String, component: String, interface: String, port: Int): HttpServer = {
+ from(name, component, interface, port, Kamon, Kamon)
+ }
+
+ def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
+ val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration)
+ val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else {
+ configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration)
+ }
+
+ new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port)
+ }
+
+ val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server"
+ val DefaultHttpServer = "default"
+ val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default"
+
+
+ private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer {
+ private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None
+ private val _log = LoggerFactory.getLogger(classOf[Default])
+ private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
+ .getOrElse {
+ _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation")
+ contextPropagation.defaultHttpPropagation()
+ }
+
+ override def receive(request: HttpMessage.Request): RequestHandler = {
+
+ val incomingContext = if(settings.enableContextPropagation)
+ _propagation.read(request)
+ else Context.Empty
+
+ val requestSpan = if(settings.enableTracing)
+ buildServerSpan(incomingContext, request)
+ else Span.Empty
+
+ val handlerContext = if(requestSpan.nonEmpty())
+ incomingContext.withKey(Span.ContextKey, requestSpan)
+ else incomingContext
+
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.activeRequests.increment()
+ }
+
+
+ new HttpServer.RequestHandler {
+ override def context: Context =
+ handlerContext
+
+ override def span: Span =
+ requestSpan
+
+ override def doneReceiving(receivedBytes: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.requestSize.record(receivedBytes)
+ }
+ }
+
+ override def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse = {
+ def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.tagMetric(tag, value)
+ case TagMode.Span => span.tag(tag, value)
+ case TagMode.Off =>
+ }
+
+ if(settings.enableContextPropagation) {
+ _propagation.write(context, response)
+ }
+
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.countCompletedRequest(response.statusCode)
+ }
+
+ addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode)
+ response.build()
+ }
+
+ override def doneSending(sentBytes: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.activeRequests.decrement()
+ httpServerMetrics.responseSize.record(sentBytes)
+ }
+
+ span.finish()
+ }
+ }
+ }
+
+ override def openConnection(): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.openConnections.increment()
+ }
+ }
+
+ override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.openConnections.decrement()
+ httpServerMetrics.connectionLifetime.record(lifetime.toNanos)
+ httpServerMetrics.connectionUsage.record(handledRequests)
+ }
+ }
+
+ override def shutdown(): Unit = {
+ _metrics.foreach { httpServerMetrics =>
+ httpServerMetrics.cleanup()
+ }
+ }
+
+
+ private def buildServerSpan(context: Context, request: HttpMessage.Request): Span = {
+ val span = Kamon.buildSpan(operationName(request))
+ .withMetricTag("span.kind", "server")
+ .withMetricTag("component", component)
+
+ if(!settings.enableSpanMetrics)
+ span.disableMetrics()
+
+
+ for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } {
+ val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID)
+ if(identifier != IdentityProvider.NoIdentifier)
+ span.withTraceID(identifier)
+ }
+
+ def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.withMetricTag(tag, value)
+ case TagMode.Span => span.withTag(tag, value)
+ case TagMode.Off =>
+ }
+
+ addRequestTag("http.url", request.url, settings.urlTagMode)
+ addRequestTag("http.method", request.method, settings.urlTagMode)
+ settings.contextTags.foreach {
+ case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode))
+ }
+
+ span.start()
+ }
+
+ private def operationName(request: HttpMessage.Request): String = {
+ val requestPath = request.path
+ val customMapping = settings.operationMappings.collectFirst {
+ case (pattern, operationName) if pattern.accept(requestPath) => operationName
+ }
+
+ customMapping.getOrElse("http.request")
+ }
+ }
+
+
+ case class Settings(
+ enableContextPropagation: Boolean,
+ propagationChannel: String,
+ enableServerMetrics: Boolean,
+ enableTracing: Boolean,
+ traceIDTag: Option[String],
+ enableSpanMetrics: Boolean,
+ urlTagMode: TagMode,
+ methodTagMode: TagMode,
+ statusCodeTagMode: TagMode,
+ contextTags: Map[String, TagMode],
+ unhandledOperationName: String,
+ operationMappings: Map[GlobPathFilter, String]
+ )
+
+ object Settings {
+
+ sealed trait TagMode
+ object TagMode {
+ case object Metric extends TagMode
+ case object Span extends TagMode
+ case object Off extends TagMode
+
+ def from(value: String): TagMode = value.toLowerCase match {
+ case "metric" => TagMode.Metric
+ case "span" => TagMode.Span
+ case _ => TagMode.Off
+ }
+ }
+
+ def from(config: Config): Settings = {
+
+ // Context propagation settings
+ val enablePropagation = config.getBoolean("propagation.enabled")
+ val propagationChannel = config.getString("propagation.channel")
+
+ // HTTP Server metrics settings
+ val enableServerMetrics = config.getBoolean("metrics.enabled")
+
+ // Tracing settings
+ val enableTracing = config.getBoolean("tracing.enabled")
+ val traceIdTag = Option(config.getString("tracing.preferred-trace-id-tag")).filterNot(_ == "none")
+ val enableSpanMetrics = config.getBoolean("tracing.span-metrics")
+ val urlTagMode = TagMode.from(config.getString("tracing.tags.url"))
+ val methodTagMode = TagMode.from(config.getString("tracing.tags.method"))
+ val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code"))
+ val contextTags = config.getConfig("tracing.tags.from-context").pairs.map {
+ case (tagName, mode) => (tagName, TagMode.from(mode))
+ }
+
+ val unhandledOperationName = config.getString("tracing.operations.unhandled")
+ val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map {
+ case (pattern, operationName) => (new GlobPathFilter(pattern), operationName)
+ }
+
+ Settings(
+ enablePropagation,
+ propagationChannel,
+ enableServerMetrics,
+ enableTracing,
+ traceIdTag,
+ enableSpanMetrics,
+ urlTagMode,
+ methodTagMode,
+ statusCodeTagMode,
+ contextTags,
+ unhandledOperationName,
+ operationMappings
+ )
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala b/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala
new file mode 100644
index 00000000..f5a5e63b
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/Mixin.scala
@@ -0,0 +1,36 @@
+package kamon.instrumentation
+
+import kamon.Kamon
+import kamon.context.Context
+
+/**
+ * Common mixins used across multiple instrumentation modules.
+ */
+object Mixin {
+
+ /**
+ * Utility trait that marks objects carrying a reference to a Context instance.
+ *
+ */
+ trait HasContext {
+ def context: Context
+ }
+
+ object HasContext {
+ private case class Default(context: Context) extends HasContext
+
+ /**
+ * Construct a HasSpan instance that references the provided Context.
+ *
+ */
+ def from(context: Context): HasContext =
+ Default(context)
+
+ /**
+ * Construct a HasContext instance with the current Kamon from Kamon's default context storage.
+ *
+ */
+ def fromCurrentContext(): HasContext =
+ Default(Kamon.currentContext())
+ }
+}