aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-09-04 18:32:36 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2018-09-04 18:32:36 +0200
commitac43c0476c239a9cf1c20e838b0fd212b20161e1 (patch)
tree3ecded509503a0a270bdfddf61270a2f69944b37 /kamon-core
parentc487c51a54e67944c80cf2aecc63ac8158bf99a6 (diff)
downloadKamon-ac43c0476c239a9cf1c20e838b0fd212b20161e1.tar.gz
Kamon-ac43c0476c239a9cf1c20e838b0fd212b20161e1.tar.bz2
Kamon-ac43c0476c239a9cf1c20e838b0fd212b20161e1.zip
basic testing for the HTTP server instrumentation
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/reference.conf2
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala154
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala18
3 files changed, 150 insertions, 24 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 8fb2abcc..5984f3c4 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -301,7 +301,7 @@ kamon {
# only if all these conditions are met:
# - the context tag is present.
# - there is no parent Span on the incoming context (i.e. this is the first service on the trace).
- # - the identifier is valued in accordance to the identity provider.
+ # - the identifier is valid in accordance to the identity provider.
trace-id-tag = "none"
# Enables collection of span metrics using the `span.processing-time` metric.
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
index 10dccdbc..88803bad 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -1,51 +1,142 @@
package kamon
package instrumentation
+import java.time.Duration
+
import com.typesafe.config.Config
import kamon.context.Context
import kamon.context.HttpPropagation.Direction
import kamon.instrumentation.HttpServer.Settings.TagMode
-import kamon.trace.Span
+import kamon.trace.{IdentityProvider, Span}
import kamon.util.GlobPathFilter
+import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
+/**
+ * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server
+ * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All
+ * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default".
+ *
+ * The default implementation shipping with Kamon provides:
+ *
+ * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context
+ * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server.
+ * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric
+ * tags from the incoming requests as well as form the incoming context tags.
+ * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code
+ * counts in the HTTP server.
+ *
+ */
trait HttpServer {
- def handle(request: HttpRequest): HttpServer.Handler
+ /**
+ * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that
+ * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from
+ * HTTP headers (if context propagation is enabled).
+ *
+ * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked
+ * for all incoming requests.
+ *
+ * @param request A HttpRequest wrapper on the original incoming HTTP request.
+ * @return The RequestHandler that will follow the lifecycle of the incoming request.
+ */
+ def receive(request: HttpRequest): HttpServer.RequestHandler
+
+
+ /**
+ * Signals that a new HTTP connection has been opened.
+ */
+ def openConnection(): Unit
+
+
+ /**
+ * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests
+ * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will
+ * be updated when the values are zero.
+ *
+ * @param lifetime For how long did the connection remain open.
+ * @param handledRequests How many requests where handled by the closed connection.
+ */
+ def closeConnection(lifetime: Duration, handledRequests: Long): Unit
}
object HttpServer {
- trait Handler {
-
+ /**
+ * Handler associated to the processing of a single request. The instrumentation code using this class is responsible
+ * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the
+ * doneReceiving, send and doneSending callbacks when appropriate.
+ */
+ trait RequestHandler {
+
+ /**
+ * If context propagation is enabled this function returns the incoming context associated wih this request,
+ * otherwise [[Context.Empty]] is returned.
+ */
def context: Context
+ /**
+ * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span.
+ */
def span: Span
- def finishRequest(): Unit
+ /**
+ * Signals that the entire request (headers and body) has been received.
+ *
+ * @param receivedBytes Size of the entire HTTP request.
+ */
+ def doneReceiving(receivedBytes: Long): Unit
+
+ /**
+ * Process a response to be sent back to the client. Since returning keys might need to included in the response
+ * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one
+ * passed into this function.
+ *
+ * @param response Wraps the HTTP response to be sent back to the client.
+ * @param context Context that should be used for writing returning keys into the response.
+ * @return The modified HTTP response that should be sent to clients.
+ */
+ def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse
+
+ /**
+ * Signals that the entire response (headers and body) has been sent to the client.
+ *
+ * @param sentBytes Size of the entire HTTP response.
+ */
+ def doneSending(sentBytes: Long): Unit
- def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse
-
- def endResponse(): Unit
}
+
def from(name: String, port: Int, component: String): HttpServer = {
from(name, port, component, Kamon, Kamon)
}
def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
- val configKey = "kamon.instrumentation.http-server." + name
- new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component)
+ val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration)
+ val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else {
+ configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration)
+ }
+
+ new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, port, component)
}
+ val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server"
+ val DefaultHttpServer = "default"
+ val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default"
- class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer {
+
+ private class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer {
+ private val _log = LoggerFactory.getLogger(classOf[Default])
private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
- .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}"))
+ .getOrElse {
+ _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation")
+ contextPropagation.defaultHttpPropagation()
+ }
- override def handle(request: HttpRequest): Handler = {
+ override def receive(request: HttpRequest): RequestHandler = {
val incomingContext = if(settings.enableContextPropagation)
_propagation.readContext(request)
else Context.Empty
@@ -61,44 +152,65 @@ object HttpServer {
// TODO: Handle HTTP Server Metrics
- new HttpServer.Handler {
+ new HttpServer.RequestHandler {
override def context: Context =
handlerContext
override def span: Span =
requestSpan
- override def finishRequest(): Unit = {}
+ override def doneReceiving(receivedBytes: Long): Unit = {}
+
+ override def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = {
+ def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.tagMetric(tag, value)
+ case TagMode.Span => span.tag(tag, value)
+ case TagMode.Off =>
+ }
- override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = {
if(settings.enableContextPropagation) {
_propagation.writeContext(context, response, Direction.Returning)
}
+ addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode)
response.build()
}
- override def endResponse(): Unit = {
+ override def doneSending(sentBytes: Long): Unit = {
span.finish()
}
}
}
+ override def openConnection(): Unit = ???
+
+ override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = ???
+
private def buildServerSpan(context: Context, request: HttpRequest): Span = {
val span = Kamon.buildSpan(operationName(request))
.withMetricTag("span.kind", "server")
.withMetricTag("component", component)
- def addTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ if(!settings.enableSpanMetrics)
+ span.disableMetrics()
+
+
+ for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } {
+ val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID)
+ if(identifier != IdentityProvider.NoIdentifier)
+ span.withTraceID(identifier)
+ }
+
+ def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match {
case TagMode.Metric => span.withMetricTag(tag, value)
case TagMode.Span => span.withTag(tag, value)
case TagMode.Off =>
}
- addTag("http.url", request.url, settings.urlTagMode)
- addTag("http.method", request.method, settings.urlTagMode)
+ addRequestTag("http.url", request.url, settings.urlTagMode)
+ addRequestTag("http.method", request.method, settings.urlTagMode)
settings.contextTags.foreach {
- case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode))
+ case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode))
}
span.start()
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 3e857f00..ad7ffbed 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -99,6 +99,7 @@ object Tracer {
private var initialMetricTags = Map.empty[String, String]
private var useParentFromContext = true
private var trackMetrics = true
+ private var providedTraceID = IdentityProvider.NoIdentifier
def asChildOf(parent: Span): SpanBuilder = {
if(parent != Span.Empty) this.parentSpan = parent
@@ -158,6 +159,11 @@ object Tracer {
this
}
+ def withTraceID(identifier: IdentityProvider.Identifier): SpanBuilder = {
+ this.providedTraceID = identifier
+ this
+ }
+
def start(): Span = {
val spanFrom = if(from == Instant.EPOCH) clock.instant() else from
@@ -199,13 +205,21 @@ object Tracer {
else
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)
- private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
+ private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = {
+ val traceID =
+ if(providedTraceID != IdentityProvider.NoIdentifier)
+ providedTraceID
+ else
+ tracer._identityProvider.traceIdGenerator().generate()
+
+
SpanContext(
- traceID = tracer._identityProvider.traceIdGenerator().generate(),
+ traceID,
spanID = tracer._identityProvider.spanIdGenerator().generate(),
parentID = IdentityProvider.NoIdentifier,
samplingDecision = samplingDecision
)
+ }
}
private final class TracerMetrics(metricLookup: MetricLookup) {