aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
blob: 68975711cf172f1a83de1d8b7b506b21bc746126 (plain) (tree)
1
2
3
4
5
6
7
8


                       

                         

                                 
                                                        


                                                       
                                           
                                
                              


                                        














                                                                                                                       

                  










                                                                                                                      
                                                                      
















                                                                                                                   
 





                                                                                                                    



                   










                                                                                                                        

                        


                                                                                                                   

                  















                                                                                                                     
                                                                                                               






                                                                                        
 

   
 


































































                                                                                                              

   















































                                                                                                                                                               




                                                                                                                     
                                                                                                             

   


                                                                                
 
 

                                                                                                                                                         
                                                                
                                                                                              



                                                                                                                                    
 
                                                                          
 
                                                                 
                                  









                                                             


                                                    

 
                                     





                                       




                                                                 
 
                                                                                                                                




                                                                                            
 



                                                                        




                                                                                                                                 
                                                                                                      


                          
                                                           
                                                 
                                                        


                                                            




                       


















                                                                                     
 
 
                                                                                         



                                                        



                                     
                                                                                                   





                                                                                       




                                                             
                                                                 
                                                                          
                                    



                                                           




                  
                                                                       
                                    


                                                                                     









                                             
                           
                                                            




                                      

                                          



















                                                                  
                                                                                                                  






                                                                      


                                                              



                                                                                       







                                                                                        


                                                                                                       




                                                                                         
                


                            
                      
                         




                          

                              





                               
package kamon
package instrumentation

import java.time.Duration

import com.typesafe.config.Config
import kamon.context.Context
import kamon.instrumentation.HttpServer.Settings.TagMode
import kamon.metric.MeasurementUnit.{information, time}
import kamon.tag.TagSet
import kamon.tag.Lookups.{any, option}
import kamon.trace.{IdentityProvider, Span}
import kamon.util.GlobPathFilter
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._

/**
  * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server
  * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All
  * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default".
  *
  * The default implementation shipping with Kamon provides:
  *
  * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context
  *   propagation is further used to enable distributed tracing on top of any instrumented HTTP Server.
  * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric
  *   tags from the incoming requests as well as form the incoming context tags.
  * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code
  *   counts in the HTTP server.
  *
  */
trait HttpServer {

  /**
    * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that
    * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from
    * HTTP headers (if context propagation is enabled).
    *
    * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked
    * for all incoming requests.
    *
    * @param request A HttpRequest wrapper on the original incoming HTTP request.
    * @return The RequestHandler that will follow the lifecycle of the incoming request.
    */
  def receive(request: HttpMessage.Request): HttpServer.RequestHandler


  /**
    * Signals that a new HTTP connection has been opened.
    */
  def openConnection(): Unit


  /**
    * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests
    * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will
    * be updated when the values are zero.
    *
    * @param lifetime For how long did the connection remain open.
    * @param handledRequests How many requests where handled by the closed connection.
    */
  def closeConnection(lifetime: Duration, handledRequests: Long): Unit

  /**
    * Frees resources that might have been acquired to provide the instrumentation. Behavior on HttpServer instances
    * after calling this function is undefined.
    */
  def shutdown(): Unit

}

object HttpServer {

  /**
    * Handler associated to the processing of a single request. The instrumentation code using this class is responsible
    * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the
    * doneReceiving, send and doneSending callbacks when appropriate.
    */
  trait RequestHandler {

    /**
      * If context propagation is enabled this function returns the incoming context associated wih this request,
      * otherwise [[Context.Empty]] is returned.
      */
    def context: Context

    /**
      * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span.
      */
    def span: Span

    /**
      * Signals that the entire request (headers and body) has been received.
      *
      * @param receivedBytes Size of the entire HTTP request.
      */
    def doneReceiving(receivedBytes: Long): Unit

    /**
      * Process a response to be sent back to the client. Since returning keys might need to included in the response
      * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one
      * passed into this function.
      *
      * @param response Wraps the HTTP response to be sent back to the client.
      * @param context Context that should be used for writing returning keys into the response.
      * @return The modified HTTP response that should be sent to clients.
      */
    def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse

    /**
      * Signals that the entire response (headers and body) has been sent to the client.
      *
      * @param sentBytes Size of the entire HTTP response.
      */
    def doneSending(sentBytes: Long): Unit

  }


  /**
    * Holds all metric instruments required to record metrics from an HTTP server.
    *
    * @param interface Interface name or address where the HTTP server is listening.
    * @param port Port number where the HTTP server is listening.
    */
  class Metrics(component: String, interface: String, port: Int) {
    import Metrics._
    private val _log = LoggerFactory.getLogger(classOf[HttpServer.Metrics])
    private val _statusCodeTag = "status_code"
    private val _serverTags = Map(
      "component" -> component,
      "interface" -> interface,
      "port" -> port.toString
    )

    val requestsInformational = CompletedRequests.refine(statusCodeTag("1xx"))
    val requestsSuccessful = CompletedRequests.refine(statusCodeTag("2xx"))
    val requestsRedirection = CompletedRequests.refine(statusCodeTag("3xx"))
    val requestsClientError = CompletedRequests.refine(statusCodeTag("4xx"))
    val requestsServerError = CompletedRequests.refine(statusCodeTag("5xx"))

    val activeRequests = ActiveRequests.refine(_serverTags)
    val requestSize = RequestSize.refine(_serverTags)
    val responseSize = ResponseSize.refine(_serverTags)
    val connectionLifetime = ConnectionLifetime.refine(_serverTags)
    val connectionUsage = ConnectionUsage.refine(_serverTags)
    val openConnections = OpenConnections.refine(_serverTags)


    def countCompletedRequest(statusCode: Int): Unit = {
      if(statusCode >= 200 && statusCode <= 299)
        requestsSuccessful.increment()
      else if(statusCode >= 500 && statusCode <=599)
        requestsServerError.increment()
      else if(statusCode >= 400 && statusCode <=499)
        requestsClientError.increment()
      else if(statusCode >= 300 && statusCode <=399)
        requestsRedirection.increment()
      else if(statusCode >= 100 && statusCode <=199)
        requestsInformational.increment()
      else {
        _log.warn("Unknown HTTP status code {} found when recording HTTP server metrics", statusCode.toString)
      }
    }

    /**
      * Removes all registered metrics from Kamon.
      */
    def cleanup(): Unit = {
      CompletedRequests.remove(statusCodeTag("1xx"))
      CompletedRequests.remove(statusCodeTag("2xx"))
      CompletedRequests.remove(statusCodeTag("3xx"))
      CompletedRequests.remove(statusCodeTag("4xx"))
      CompletedRequests.remove(statusCodeTag("5xx"))

      ActiveRequests.remove(_serverTags)
      RequestSize.remove(_serverTags)
      ResponseSize.remove(_serverTags)
      ConnectionLifetime.remove(_serverTags)
      ConnectionUsage.remove(_serverTags)
      OpenConnections.remove(_serverTags)
    }

    private def statusCodeTag(group: String): Map[String, String] =
      _serverTags + (_statusCodeTag -> group)

  }


  object Metrics {

    def of(component: String, interface: String, port: Int): Metrics =
      new HttpServer.Metrics(component, interface, port)

    /**
      * Number of completed requests per status code.
      */
    val CompletedRequests = Kamon.counter("http.server.requests")

    /**
      * Number of requests being processed simultaneously at any point in time.
      */
    val ActiveRequests = Kamon.rangeSampler("http.server.request.active")

    /**
      * Request size distribution (including headers and body) for all requests received by the server.
      */
    val RequestSize = Kamon.histogram("http.server.request.size", information.bytes)

    /**
      * Response size distribution (including headers and body) for all responses served by the server.
      */
    val ResponseSize = Kamon.histogram("http.server.response.size", information.bytes)

    /**
      * Tracks the time elapsed between connection creation and connection close.
      */
    val ConnectionLifetime = Kamon.histogram("http.server.connection.lifetime", time.nanoseconds)

    /**
      * Distribution of number of requests handled per connection during their entire lifetime.
      */
    val ConnectionUsage = Kamon.histogram("http.server.connection.usage")

    /**
      * Number of open connections.
      */
    val OpenConnections = Kamon.rangeSampler("http.server.connection.open")
  }


  def from(name: String, component: String, interface: String, port: Int): HttpServer = {
    from(name, component, interface, port, Kamon, Kamon)
  }

  def from(name: String, component: String, interface: String, port: Int, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
    val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration)
    val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else {
      configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration)
    }

    new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, component, interface, port)
  }

  val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server"
  val DefaultHttpServer = "default"
  val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default"


  private class Default(settings: Settings, contextPropagation: ContextPropagation, component: String, interface: String, port: Int) extends HttpServer {
    private val _metrics = if(settings.enableServerMetrics) Some(HttpServer.Metrics.of(component, interface, port)) else None
    private val _log = LoggerFactory.getLogger(classOf[Default])
    private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
      .getOrElse {
        _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation")
        contextPropagation.defaultHttpPropagation()
      }

    override def receive(request: HttpMessage.Request): RequestHandler = {

      val incomingContext = if(settings.enableContextPropagation)
        _propagation.read(request)
        else Context.Empty

      val requestSpan = if(settings.enableTracing)
        buildServerSpan(incomingContext, request)
        else Span.Empty

      val handlerContext = if(requestSpan.nonEmpty())
        incomingContext.withKey(Span.ContextKey, requestSpan)
      else incomingContext

      _metrics.foreach { httpServerMetrics =>
        httpServerMetrics.activeRequests.increment()
      }


      new HttpServer.RequestHandler {
        override def context: Context =
          handlerContext

        override def span: Span =
          requestSpan

        override def doneReceiving(receivedBytes: Long): Unit = {
          _metrics.foreach { httpServerMetrics =>
            httpServerMetrics.requestSize.record(receivedBytes)
          }
        }

        override def send[HttpResponse](response: HttpMessage.ResponseBuilder[HttpResponse], context: Context): HttpResponse = {
          def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match {
            case TagMode.Metric => span.tagMetric(tag, value)
            case TagMode.Span => span.tag(tag, value)
            case TagMode.Off =>
          }

          _metrics.foreach { httpServerMetrics =>
            httpServerMetrics.countCompletedRequest(response.statusCode)
          }

          if(span.nonEmpty()) {
            settings.traceIDResponseHeader.foreach(traceIDHeader => response.write(traceIDHeader, span.context().traceID.string))
            settings.spanIDResponseHeader.foreach(spanIDHeader => response.write(spanIDHeader, span.context().spanID.string))
          }

          addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode)
          response.build()
        }

        override def doneSending(sentBytes: Long): Unit = {
          _metrics.foreach { httpServerMetrics =>
            httpServerMetrics.activeRequests.decrement()
            httpServerMetrics.responseSize.record(sentBytes)
          }

          span.finish()
        }
      }
    }

    override def openConnection(): Unit = {
      _metrics.foreach { httpServerMetrics =>
        httpServerMetrics.openConnections.increment()
      }
    }

    override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = {
      _metrics.foreach { httpServerMetrics =>
        httpServerMetrics.openConnections.decrement()
        httpServerMetrics.connectionLifetime.record(lifetime.toNanos)
        httpServerMetrics.connectionUsage.record(handledRequests)
      }
    }

    override def shutdown(): Unit = {
      _metrics.foreach { httpServerMetrics =>
        httpServerMetrics.cleanup()
      }
    }


    private def buildServerSpan(context: Context, request: HttpMessage.Request): Span = {
      val span = Kamon.buildSpan(operationName(request))
        .withMetricTag("span.kind", "server")
        .withMetricTag("component", component)

      if(!settings.enableSpanMetrics)
        span.disableMetrics()


      for {traceIdTag <- settings.traceIDTagLookup; customTraceID <- context.getTag(traceIdTag) } {
        val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID)
        if(identifier != IdentityProvider.NoIdentifier)
          span.withTraceID(identifier)
      }

      def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match {
        case TagMode.Metric => span.withMetricTag(tag, value)
        case TagMode.Span => span.withTag(tag, value)
        case TagMode.Off =>
      }

      addRequestTag("http.url", request.url, settings.urlTagMode)
      addRequestTag("http.method", request.method, settings.methodTagMode)
      settings.contextTags.foreach {
        case (tagName, mode) =>
          val tagValue = context.getTag(any(tagName))
          if(tagValue != null)
            addRequestTag(tagName, tagValue.toString, mode)
      }

      span.start()
    }

    private def operationName(request: HttpMessage.Request): String = {
      val requestPath = request.path
      val customMapping = settings.operationMappings.collectFirst {
        case (pattern, operationName) if pattern.accept(requestPath) => operationName
      }

      customMapping.getOrElse("http.request")
    }
  }


  case class Settings(
    enableContextPropagation: Boolean,
    propagationChannel: String,
    enableServerMetrics: Boolean,
    enableTracing: Boolean,
    traceIDTagLookup: Option[TagSet.Lookup[Option[String]]],
    enableSpanMetrics: Boolean,
    urlTagMode: TagMode,
    methodTagMode: TagMode,
    statusCodeTagMode: TagMode,
    contextTags: Map[String, TagMode],
    traceIDResponseHeader: Option[String],
    spanIDResponseHeader: Option[String],
    unhandledOperationName: String,
    operationMappings: Map[GlobPathFilter, String]
  )

  object Settings {

    sealed trait TagMode
    object TagMode {
      case object Metric extends TagMode
      case object Span extends TagMode
      case object Off extends TagMode

      def from(value: String): TagMode = value.toLowerCase match {
        case "metric" => TagMode.Metric
        case "span" => TagMode.Span
        case _ => TagMode.Off
      }
    }

    def from(config: Config): Settings = {
      def optionalString(value: String): Option[String] = if(value.equalsIgnoreCase("none")) None else Some(value)

      // Context propagation settings
      val enablePropagation = config.getBoolean("propagation.enabled")
      val propagationChannel = config.getString("propagation.channel")

      // HTTP Server metrics settings
      val enableServerMetrics = config.getBoolean("metrics.enabled")

      // Tracing settings
      val enableTracing = config.getBoolean("tracing.enabled")
      val traceIdTagLookup = Option(config.getString("tracing.preferred-trace-id-tag"))
        .filterNot(_ == "none")
        .map(option)

      val enableSpanMetrics = config.getBoolean("tracing.span-metrics")
      val urlTagMode = TagMode.from(config.getString("tracing.tags.url"))
      val methodTagMode = TagMode.from(config.getString("tracing.tags.method"))
      val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code"))
      val contextTags = config.getConfig("tracing.tags.from-context").pairs.map {
        case (tagName, mode) => (tagName, TagMode.from(mode))
      }

      val traceIDResponseHeader = optionalString(config.getString("tracing.response-headers.trace-id"))
      val spanIDResponseHeader = optionalString(config.getString("tracing.response-headers.span-id"))

      val unhandledOperationName = config.getString("tracing.operations.unhandled")
      val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map {
        case (pattern, operationName) => (new GlobPathFilter(pattern), operationName)
      }

      Settings (
        enablePropagation,
        propagationChannel,
        enableServerMetrics,
        enableTracing,
        traceIdTagLookup,
        enableSpanMetrics,
        urlTagMode,
        methodTagMode,
        statusCodeTagMode,
        contextTags,
        traceIDResponseHeader,
        spanIDResponseHeader,
        unhandledOperationName,
        operationMappings
      )
    }
  }
}