aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core-tests/src/test/resources/reference.conf126
-rw-r--r--kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala65
-rw-r--r--kamon-core/src/main/resources/reference.conf2
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala154
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala18
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala6
6 files changed, 232 insertions, 139 deletions
diff --git a/kamon-core-tests/src/test/resources/reference.conf b/kamon-core-tests/src/test/resources/reference.conf
index d165249e..6615bb7b 100644
--- a/kamon-core-tests/src/test/resources/reference.conf
+++ b/kamon-core-tests/src/test/resources/reference.conf
@@ -7,117 +7,31 @@ kamon {
kamon {
- instrumentation {
- http-server {
- noop {
-
- #
- # Configuration for HTTP context propagation
- #
- propagation {
-
- # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
- # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
- # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
- enabled = no
-
- # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
- # configuration for more details on how to configure the detault HTTP context propagation.
- channel = "default"
- }
-
-
- #
- # Configuration for HTTP server metrics collection
- #
- metrics {
-
- # Enables collection of HTTP server metrics
- enabled = no
-
- # Tags to include on the HTTP server metrics. The available options are:
- # - method: HTTP method from the request.
- # - status-code: HTTP status code from the responses.
- #
- tags = [
- "method",
- "status-code"
- ]
- }
-
-
- #
- # Configuration for HTTP request tracing
- #
- tracing {
-
- # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests
- # and finish them when the response is sent back to the clients.
- enabled = no
-
- # Select a context tag that provides a custom trace identifier. The custom trace identifier will be used
- # only if all these conditions are met:
- # - the context tag is present.
- # - there is no parent Span on the incoming context (i.e. this is the first service on the trace).
- # - the identifier is valued in accordance to the identity provider.
- trace-id-tag = "none"
- # Enables collection of span metrics using the `span.processing-time` metric.
- span-metrics = on
-
- # Select which tags should be included as span and span metric tags. The possible options are:
- # - span: the tag is added as a Span tag (i.e. using span.tag(...))
- # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
- # - off: the tag is not used.
- #
- tags {
-
- # Use the http.url tag.
- url = span
-
- # Use the http.method tag.
- method = metric
-
- # Use the http.status_code tag.
- status-code = metric
-
- # Copy tags from the context into the Spans with the specified purpouse.
- from-context {
-
- # The peer tag identifiest the service that is calling the current service. It is added by default with
- # the HttpClient instrumentation.
- peer = metric
- }
- }
+ trace {
+ sampler = always
+ }
- # Custom mappings between routes and operation names.
- operations {
+ propagation.http.default {
+ tags.mappings {
+ "correlation-id" = "x-correlation-id"
+ }
+ }
- # Operation name for Spans created on requests that could not be handled by any route in the current
- # application.
- unhandled = "unhandled"
+ instrumentation {
+ http-server {
+ default {
+ tracing.trace-id-tag = "correlation-id"
+ }
- # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
- # instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
- # For example, with the following configuration:
- # mappings {
- # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
- # "/events/*/rsvps" = "EventRSVPs"
- # }
- #
- # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
- # the same operation name "/organization/:orgID/user/:userID/profile".
- #
- # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
- # name "EventRSVPs".
- #
- # The patterns are expressed as globs and the operation names are free form.
- #
- mappings {
+ no-span-metrics {
+ tracing.span-metrics = off
+ }
- }
- }
- }
+ noop {
+ propagation.enabled = no
+ metrics.enabled = no
+ tracing.enabled = no
}
}
}
diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
index a3662127..e31382a5 100644
--- a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
@@ -11,8 +11,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
"the HTTP server instrumentation" when {
"configured for context propagation" should {
"read context entries and tags from the incoming request" in {
- val httpServer = HttpServer.from("default", port = 8080, "http.server")
- val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
"context-tags" -> "tag=value;none=0011223344556677;",
"custom-trace-id" -> "0011223344556677"
)))
@@ -24,8 +23,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
}
"use the configured HTTP propagation channel" in {
- val httpServer = HttpServer.from("default", port = 8080, "http.server")
- val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
"context-tags" -> "tag=value;none=0011223344556677;",
"custom-trace-id" -> "0011223344556677"
)))
@@ -41,15 +39,63 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
span.tag("http.url").value shouldBe "http://localhost:8080/"
val responseHeaders = mutable.Map.empty[String, String]
- handler.startResponse(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world"))
+ handler.send(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world"))
}
}
- "when all capabilities are disabled" should {
+ "configured for distributed tracing" should {
+ "create a span representing the current HTTP operation" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+ handler.send(fakeResponse(200, mutable.Map.empty), handler.context)
+
+ val span = inspect(handler.span)
+ span.tag("http.method").value shouldBe "GET"
+ span.tag("http.url").value shouldBe "http://localhost:8080/"
+ span.tag("http.status_code").value shouldBe "200"
+ }
+
+ "adopt a traceID when explicitly provided" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "x-correlation-id" -> "0011223344556677"
+ )))
+
+ handler.span.context().traceID.string shouldBe "0011223344556677"
+ }
+
+ "record span metrics when enabled" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+ handler.send(fakeResponse(200, mutable.Map.empty), handler.context)
+
+ val span = inspect(handler.span)
+ span.hasMetricsEnabled() shouldBe true
+ }
+
+ "not record span metrics when disabled" in {
+ val handler = HttpServer.from("no-span-metrics", port = 8081, "http.server")
+ .receive(fakeRequest("http://localhost:8080/", "/", "GET", Map.empty))
+ handler.send(fakeResponse(200, mutable.Map.empty), handler.context)
+
+ val span = inspect(handler.span)
+ span.hasMetricsEnabled() shouldBe false
+ }
+
+ "receive tags from context when available" in {
+ val handler = httpServer().receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;peer=superservice;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ val span = inspect(handler.span)
+ span.tag("peer").value shouldBe "superservice"
+ }
+ }
+
+ "all capabilities are disabled" should {
"not read any context from the incoming requests" in {
val httpServer = HttpServer.from("noop", port = 8081, "http.server")
- val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
"context-tags" -> "tag=value;none=0011223344556677;",
"custom-trace-id" -> "0011223344556677"
)))
@@ -59,7 +105,7 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
"not create any span to represent the server request" in {
val httpServer = HttpServer.from("noop", port = 8081, "http.server")
- val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ val handler = httpServer.receive(fakeRequest("http://localhost:8080/", "/", "GET", Map(
"context-tags" -> "tag=value;none=0011223344556677;",
"custom-trace-id" -> "0011223344556677"
)))
@@ -71,6 +117,9 @@ class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInsp
}
}
+ def httpServer(): HttpServer = HttpServer.from("default", port = 8081, "http.server")
+ def noopHttpServer(): HttpServer = HttpServer.from("noop", port = 8081, "http.server")
+
def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpRequest =
new HttpRequest {
override def url: String = requestUrl
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 8fb2abcc..5984f3c4 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -301,7 +301,7 @@ kamon {
# only if all these conditions are met:
# - the context tag is present.
# - there is no parent Span on the incoming context (i.e. this is the first service on the trace).
- # - the identifier is valued in accordance to the identity provider.
+ # - the identifier is valid in accordance to the identity provider.
trace-id-tag = "none"
# Enables collection of span metrics using the `span.processing-time` metric.
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
index 10dccdbc..88803bad 100644
--- a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -1,51 +1,142 @@
package kamon
package instrumentation
+import java.time.Duration
+
import com.typesafe.config.Config
import kamon.context.Context
import kamon.context.HttpPropagation.Direction
import kamon.instrumentation.HttpServer.Settings.TagMode
-import kamon.trace.Span
+import kamon.trace.{IdentityProvider, Span}
import kamon.util.GlobPathFilter
+import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
+/**
+ * HTTP Server instrumentation handler that takes care of context propagation, distributed tracing and HTTP server
+ * metrics. Instances can be created by using the [[HttpServer.from]] method with the desired configuration name. All
+ * configuration for the default HTTP instrumentation is at "kamon.instrumentation.http-server.default".
+ *
+ * The default implementation shipping with Kamon provides:
+ *
+ * - Context Propagation: Incoming and Returning context propagation as well as incoming context tags. Context
+ * propagation is further used to enable distributed tracing on top of any instrumented HTTP Server.
+ * - Distributed Tracing: Automatically join traces initiated by the callers of this service and apply span and metric
+ * tags from the incoming requests as well as form the incoming context tags.
+ * - Server Metrics: Basic request processing metrics to understand connection usage, throughput and response code
+ * counts in the HTTP server.
+ *
+ */
trait HttpServer {
- def handle(request: HttpRequest): HttpServer.Handler
+ /**
+ * Initiate handling of a HTTP request received by this server. The returned RequestHandler contains the Span that
+ * represents the processing of the incoming HTTP request (if tracing is enabled) and the Context extracted from
+ * HTTP headers (if context propagation is enabled).
+ *
+ * Callers of this method **must** always ensure that the doneReceiving, send and doneSending callbacks are invoked
+ * for all incoming requests.
+ *
+ * @param request A HttpRequest wrapper on the original incoming HTTP request.
+ * @return The RequestHandler that will follow the lifecycle of the incoming request.
+ */
+ def receive(request: HttpRequest): HttpServer.RequestHandler
+
+
+ /**
+ * Signals that a new HTTP connection has been opened.
+ */
+ def openConnection(): Unit
+
+
+ /**
+ * Signals that a HTTP connection has been closed. If the connection lifetime or the number of handled requests
+ * cannot be determined the the values [[Duration.ZERO]] and zero can be provided, respectively. No metrics will
+ * be updated when the values are zero.
+ *
+ * @param lifetime For how long did the connection remain open.
+ * @param handledRequests How many requests where handled by the closed connection.
+ */
+ def closeConnection(lifetime: Duration, handledRequests: Long): Unit
}
object HttpServer {
- trait Handler {
-
+ /**
+ * Handler associated to the processing of a single request. The instrumentation code using this class is responsible
+ * of creating a dedicated [[HttpServer.RequestHandler]] instance for each received request should invoking the
+ * doneReceiving, send and doneSending callbacks when appropriate.
+ */
+ trait RequestHandler {
+
+ /**
+ * If context propagation is enabled this function returns the incoming context associated wih this request,
+ * otherwise [[Context.Empty]] is returned.
+ */
def context: Context
+ /**
+ * Span representing the current HTTP server operation. If tracing is disabled this will return an empty span.
+ */
def span: Span
- def finishRequest(): Unit
+ /**
+ * Signals that the entire request (headers and body) has been received.
+ *
+ * @param receivedBytes Size of the entire HTTP request.
+ */
+ def doneReceiving(receivedBytes: Long): Unit
+
+ /**
+ * Process a response to be sent back to the client. Since returning keys might need to included in the response
+ * headers, users of this class must ensure that the returned HttpResponse is used instead of the original one
+ * passed into this function.
+ *
+ * @param response Wraps the HTTP response to be sent back to the client.
+ * @param context Context that should be used for writing returning keys into the response.
+ * @return The modified HTTP response that should be sent to clients.
+ */
+ def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse
+
+ /**
+ * Signals that the entire response (headers and body) has been sent to the client.
+ *
+ * @param sentBytes Size of the entire HTTP response.
+ */
+ def doneSending(sentBytes: Long): Unit
- def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse
-
- def endResponse(): Unit
}
+
def from(name: String, port: Int, component: String): HttpServer = {
from(name, port, component, Kamon, Kamon)
}
def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
- val configKey = "kamon.instrumentation.http-server." + name
- new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component)
+ val defaultConfiguration = configuration.config().getConfig(DefaultHttpServerConfiguration)
+ val configWithFallback = if(name == DefaultHttpServer) defaultConfiguration else {
+ configuration.config().getConfig(HttpServerConfigurationPrefix + "." + name).withFallback(defaultConfiguration)
+ }
+
+ new HttpServer.Default(Settings.from(configWithFallback), contextPropagation, port, component)
}
+ val HttpServerConfigurationPrefix = "kamon.instrumentation.http-server"
+ val DefaultHttpServer = "default"
+ val DefaultHttpServerConfiguration = s"$HttpServerConfigurationPrefix.default"
- class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer {
+
+ private class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer {
+ private val _log = LoggerFactory.getLogger(classOf[Default])
private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
- .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}"))
+ .getOrElse {
+ _log.warn(s"Could not find HTTP propagation [${settings.propagationChannel}], falling back to the default HTTP propagation")
+ contextPropagation.defaultHttpPropagation()
+ }
- override def handle(request: HttpRequest): Handler = {
+ override def receive(request: HttpRequest): RequestHandler = {
val incomingContext = if(settings.enableContextPropagation)
_propagation.readContext(request)
else Context.Empty
@@ -61,44 +152,65 @@ object HttpServer {
// TODO: Handle HTTP Server Metrics
- new HttpServer.Handler {
+ new HttpServer.RequestHandler {
override def context: Context =
handlerContext
override def span: Span =
requestSpan
- override def finishRequest(): Unit = {}
+ override def doneReceiving(receivedBytes: Long): Unit = {}
+
+ override def send[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = {
+ def addResponseTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.tagMetric(tag, value)
+ case TagMode.Span => span.tag(tag, value)
+ case TagMode.Off =>
+ }
- override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = {
if(settings.enableContextPropagation) {
_propagation.writeContext(context, response, Direction.Returning)
}
+ addResponseTag("http.status_code", response.statusCode.toString, settings.statusCodeTagMode)
response.build()
}
- override def endResponse(): Unit = {
+ override def doneSending(sentBytes: Long): Unit = {
span.finish()
}
}
}
+ override def openConnection(): Unit = ???
+
+ override def closeConnection(lifetime: Duration, handledRequests: Long): Unit = ???
+
private def buildServerSpan(context: Context, request: HttpRequest): Span = {
val span = Kamon.buildSpan(operationName(request))
.withMetricTag("span.kind", "server")
.withMetricTag("component", component)
- def addTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ if(!settings.enableSpanMetrics)
+ span.disableMetrics()
+
+
+ for { traceIdTag <- settings.traceIDTag; customTraceID <- context.getTag(traceIdTag) } {
+ val identifier = Kamon.identityProvider.traceIdGenerator().from(customTraceID)
+ if(identifier != IdentityProvider.NoIdentifier)
+ span.withTraceID(identifier)
+ }
+
+ def addRequestTag(tag: String, value: String, mode: TagMode): Unit = mode match {
case TagMode.Metric => span.withMetricTag(tag, value)
case TagMode.Span => span.withTag(tag, value)
case TagMode.Off =>
}
- addTag("http.url", request.url, settings.urlTagMode)
- addTag("http.method", request.method, settings.urlTagMode)
+ addRequestTag("http.url", request.url, settings.urlTagMode)
+ addRequestTag("http.method", request.method, settings.urlTagMode)
settings.contextTags.foreach {
- case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode))
+ case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addRequestTag(tagName, tagValue, mode))
}
span.start()
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 3e857f00..ad7ffbed 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -99,6 +99,7 @@ object Tracer {
private var initialMetricTags = Map.empty[String, String]
private var useParentFromContext = true
private var trackMetrics = true
+ private var providedTraceID = IdentityProvider.NoIdentifier
def asChildOf(parent: Span): SpanBuilder = {
if(parent != Span.Empty) this.parentSpan = parent
@@ -158,6 +159,11 @@ object Tracer {
this
}
+ def withTraceID(identifier: IdentityProvider.Identifier): SpanBuilder = {
+ this.providedTraceID = identifier
+ this
+ }
+
def start(): Span = {
val spanFrom = if(from == Instant.EPOCH) clock.instant() else from
@@ -199,13 +205,21 @@ object Tracer {
else
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)
- private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
+ private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = {
+ val traceID =
+ if(providedTraceID != IdentityProvider.NoIdentifier)
+ providedTraceID
+ else
+ tracer._identityProvider.traceIdGenerator().generate()
+
+
SpanContext(
- traceID = tracer._identityProvider.traceIdGenerator().generate(),
+ traceID,
spanID = tracer._identityProvider.spanIdGenerator().generate(),
parentID = IdentityProvider.NoIdentifier,
samplingDecision = samplingDecision
)
+ }
}
private final class TracerMetrics(metricLookup: MetricLookup) {
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
index 36b57f2d..e45124b0 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
@@ -52,13 +52,14 @@ object SpanInspection {
def spanTags(): Map[String, Span.TagValue] =
spanData.tags
- def tag(key: String): Option[String] =
+ def tag(key: String): Option[String] = {
spanTag(key).map {
case TagValue.String(string) => string
case TagValue.Number(number) => number.toString
case TagValue.True => "true"
case TagValue.False => "false"
}
+ }
def metricTags(): Map[String, String] =
getField[Span.Local, Map[String, String]](realSpan, "customMetricTags")
@@ -75,6 +76,9 @@ object SpanInspection {
def operationName(): String =
spanData.operationName
+ def hasMetricsEnabled(): Boolean =
+ getField[Span.Local, Boolean](realSpan, "collectMetrics")
+
private def getField[T, R](target: Any, fieldName: String)(implicit classTag: ClassTag[T]): R = {
val toFinishedSpanMethod = classTag.runtimeClass.getDeclaredFields.find(_.getName.contains(fieldName)).get