aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-09-03 15:37:14 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2018-09-03 15:37:14 +0200
commitc487c51a54e67944c80cf2aecc63ac8158bf99a6 (patch)
treed88669c3445fcdee659b62b0a8352f5ad50c1e9a /kamon-core
parentd30ff29cdb5f94be34163d851d71716a316bdf10 (diff)
downloadKamon-c487c51a54e67944c80cf2aecc63ac8158bf99a6.tar.gz
Kamon-c487c51a54e67944c80cf2aecc63ac8158bf99a6.tar.bz2
Kamon-c487c51a54e67944c80cf2aecc63ac8158bf99a6.zip
wip on the HttpServer instrumentation
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/reference.conf119
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala28
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala27
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala192
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala22
6 files changed, 315 insertions, 75 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 279a00a9..8fb2abcc 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -183,12 +183,12 @@ kamon {
# Codecs to be used when propagating a Context through a HTTP Headers transport.
http-headers-keys {
- span = "kamon.trace.SpanCodec$B3"
+ //span = "kamon.trace.SpanCodec$B3"
}
# Codecs to be used when propagating a Context through a Binary transport.
binary-keys {
- span = "kamon.trace.SpanCodec$Colfer"
+ //span = "kamon.trace.SpanCodec$Colfer"
}
}
}
@@ -253,12 +253,29 @@ kamon {
http-server {
default {
+ #
+ # Configuration for HTTP context propagation
+ #
+ propagation {
+
+ # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
+ # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
+ # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
+ enabled = yes
+
+ # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
+ # configuration for more details on how to configure the detault HTTP context propagation.
+ channel = "default"
+ }
+
+
+ #
# Configuration for HTTP server metrics collection
#
metrics {
# Enables collection of HTTP server metrics
- enable = yes
+ enabled = yes
# Tags to include on the HTTP server metrics. The available options are:
# - method: HTTP method from the request.
@@ -270,13 +287,15 @@ kamon {
]
}
+
+ #
# Configuration for HTTP request tracing
#
tracing {
# Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests
# and finish them when the response is sent back to the clients.
- enable = yes
+ enabled = yes
# Select a context tag that provides a custom trace identifier. The custom trace identifier will be used
# only if all these conditions are met:
@@ -285,58 +304,60 @@ kamon {
# - the identifier is valued in accordance to the identity provider.
trace-id-tag = "none"
- # Metric tags to be automatically included in the HTTP server Spans. The available options are:
- # - method: HTTP method from the request.
- # - status-code: HTTP status code from the responses.
- # - peer: Name of the service issuing the HTTP call. Will use "unknown" if not present.
- metric-tags = [
- "method",
- "status-code",
- "peer"
- ]
- }
-
- # Configuration for HTTP context propagation
- #
- propagation {
+ # Enables collection of span metrics using the `span.processing-time` metric.
+ span-metrics = on
- # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
- # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
- # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
- enabled = yes
+ # Select which tags should be included as span and span metric tags. The possible options are:
+ # - span: the tag is added as a Span tag (i.e. using span.tag(...))
+ # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
+ # - off: the tag is not used.
+ #
+ tags {
- # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
- # configuration for more details on how to configure the detault HTTP context propagation.
- channel = "default"
- }
+ # Use the http.url tag.
+ url = span
+ # Use the http.method tag.
+ method = metric
- # Custom mappings between routes and operation names.
- #
- operations {
+ # Use the http.status_code tag.
+ status-code = metric
- # Operation name for Spans created on requests that could not be handled by any route in the current
- # application.
- unhanlded = "unhandled"
+ # Copy tags from the context into the Spans with the specified purpouse.
+ from-context {
- # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
- # instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
- # For example, with the following configuration:
- # mappings {
- # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
- # "/events/*/rsvps" = "EventRSVPs"
- # }
- #
- # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
- # the same operation name "/organization/:orgID/user/:userID/profile".
- #
- # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
- # name "EventRSVPs".
- #
- # The patterns are expressed as globs and the operation names are free form.
- #
- mappings {
+ # The peer tag identifiest the service that is calling the current service. It is added by default with
+ # the HttpClient instrumentation.
+ peer = metric
+ }
+ }
+ # Custom mappings between routes and operation names.
+ operations {
+
+ # Operation name for Spans created on requests that could not be handled by any route in the current
+ # application.
+ unhandled = "unhandled"
+
+ # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
+ # instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
+ # For example, with the following configuration:
+ # mappings {
+ # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
+ # "/events/*/rsvps" = "EventRSVPs"
+ # }
+ #
+ # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
+ # the same operation name "/organization/:orgID/user/:userID/profile".
+ #
+ # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
+ # name "EventRSVPs".
+ #
+ # The patterns are expressed as globs and the operation names are free form.
+ #
+ mappings {
+
+ }
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index 1eed7e14..4d3501db 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -19,7 +19,7 @@ package context
import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
-class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) {
+class Context private (private[context] val entries: Map[Context.Key[_], Any], val tags: Map[String, String]) {
def get[T](key: Context.Key[T]): T =
entries.getOrElse(key, key.emptyValue).asInstanceOf[T]
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
index 5b0bdb38..d225aa28 100644
--- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -25,7 +25,7 @@ trait HttpPropagation {
* @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
* empty context is returned instead.
*/
- def read(reader: HttpPropagation.HeaderReader): Context
+ def readContext(reader: HttpPropagation.HeaderReader): Context
/**
* Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
@@ -38,7 +38,7 @@ trait HttpPropagation {
* @param writer Wrapper on the HTTP message that will carry the context headers.
* @param direction Write direction. It can be either Outgoing or Returning.
*/
- def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
+ def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
}
@@ -59,7 +59,7 @@ object HttpPropagation {
* @param context Current context.
* @return Either the original context passed in or a modified version of it, including the read entry.
*/
- def read(reader: HttpPropagation.HeaderReader, context: Context): Context
+ def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context
}
/**
@@ -75,7 +75,7 @@ object HttpPropagation {
* @param writer Wrapper on the HTTP message that will carry the context headers.
* @param direction Write direction. It can be either Outgoing or Returning.
*/
- def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
+ def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
}
@@ -90,7 +90,7 @@ object HttpPropagation {
* @param header HTTP header name
* @return The HTTP header value, if present.
*/
- def read(header: String): Option[String]
+ def readHeader(header: String): Option[String]
}
/**
@@ -104,7 +104,7 @@ object HttpPropagation {
* @param header HTTP header name.
* @param value HTTP header value.
*/
- def write(header: String, value: String): Unit
+ def writeHeader(header: String, value: String): Unit
}
@@ -131,12 +131,12 @@ object HttpPropagation {
* of a tag key clash.
* - Read all context entries using the incoming entries configuration.
*/
- override def read(reader: HeaderReader): Context = {
+ override def readContext(reader: HeaderReader): Context = {
val tags = Map.newBuilder[String, String]
// Tags encoded together in the context tags header.
try {
- reader.read(components.tagsHeaderName).foreach { contextTagsHeader =>
+ reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader =>
contextTagsHeader.split(";").foreach(tagData => {
val tagPair = tagData.split("=")
if (tagPair.length == 2) {
@@ -152,7 +152,7 @@ object HttpPropagation {
components.tagsMappings.foreach {
case (tagName, httpHeader) =>
try {
- reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
} catch {
case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
}
@@ -163,7 +163,7 @@ object HttpPropagation {
case (context, (entryName, entryDecoder)) =>
var result = context
try {
- result = entryDecoder.read(reader, context)
+ result = entryDecoder.readEntry(reader, context)
} catch {
case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
@@ -175,7 +175,7 @@ object HttpPropagation {
/**
* Writes context tags and entries
*/
- override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
+ override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
val keys = direction match {
case Direction.Outgoing => components.outgoingEntries
case Direction.Returning => components.returningEntries
@@ -193,21 +193,21 @@ object HttpPropagation {
// Write tags with specific mappings or append them to the context tags header.
context.tags.foreach {
case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
- case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
+ case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue)
case None => appendTag(tagKey, tagValue)
}
}
// Write the context tags header.
if(contextTagsHeader.nonEmpty) {
- writer.write(components.tagsHeaderName, contextTagsHeader.result())
+ writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result())
}
// Write entries for the specified direction.
keys.foreach {
case (entryName, entryWriter) =>
try {
- entryWriter.write(context, writer, direction)
+ entryWriter.writeEntry(context, writer, direction)
} catch {
case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
new file mode 100644
index 00000000..b0300546
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
@@ -0,0 +1,27 @@
+package kamon.instrumentation
+
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+
+
+trait HttpRequest extends HeaderReader {
+ def url: String
+ def path: String
+ def method: String
+}
+
+object HttpRequest {
+ trait Writable[T] extends HttpRequest with HeaderWriter {
+ def build(): T
+ }
+}
+
+trait HttpResponse {
+ def statusCode: Int
+}
+
+object HttpResponse {
+ trait Writable[T] extends HttpResponse with HeaderWriter {
+ def build(): T
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
new file mode 100644
index 00000000..10dccdbc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -0,0 +1,192 @@
+package kamon
+package instrumentation
+
+import com.typesafe.config.Config
+import kamon.context.Context
+import kamon.context.HttpPropagation.Direction
+import kamon.instrumentation.HttpServer.Settings.TagMode
+import kamon.trace.Span
+import kamon.util.GlobPathFilter
+
+import scala.collection.JavaConverters._
+
+trait HttpServer {
+
+ def handle(request: HttpRequest): HttpServer.Handler
+
+}
+
+object HttpServer {
+
+ trait Handler {
+
+ def context: Context
+
+ def span: Span
+
+ def finishRequest(): Unit
+
+ def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse
+
+ def endResponse(): Unit
+ }
+
+ def from(name: String, port: Int, component: String): HttpServer = {
+ from(name, port, component, Kamon, Kamon)
+ }
+
+ def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
+ val configKey = "kamon.instrumentation.http-server." + name
+ new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component)
+ }
+
+
+ class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer {
+ private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
+ .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}"))
+
+ override def handle(request: HttpRequest): Handler = {
+ val incomingContext = if(settings.enableContextPropagation)
+ _propagation.readContext(request)
+ else Context.Empty
+
+ val requestSpan = if(settings.enableTracing)
+ buildServerSpan(incomingContext, request)
+ else Span.Empty
+
+ val handlerContext = if(requestSpan.nonEmpty())
+ incomingContext.withKey(Span.ContextKey, requestSpan)
+ else incomingContext
+
+ // TODO: Handle HTTP Server Metrics
+
+
+ new HttpServer.Handler {
+ override def context: Context =
+ handlerContext
+
+ override def span: Span =
+ requestSpan
+
+ override def finishRequest(): Unit = {}
+
+ override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = {
+ if(settings.enableContextPropagation) {
+ _propagation.writeContext(context, response, Direction.Returning)
+ }
+
+ response.build()
+ }
+
+ override def endResponse(): Unit = {
+ span.finish()
+ }
+ }
+ }
+
+ private def buildServerSpan(context: Context, request: HttpRequest): Span = {
+ val span = Kamon.buildSpan(operationName(request))
+ .withMetricTag("span.kind", "server")
+ .withMetricTag("component", component)
+
+ def addTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.withMetricTag(tag, value)
+ case TagMode.Span => span.withTag(tag, value)
+ case TagMode.Off =>
+ }
+
+ addTag("http.url", request.url, settings.urlTagMode)
+ addTag("http.method", request.method, settings.urlTagMode)
+ settings.contextTags.foreach {
+ case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode))
+ }
+
+ span.start()
+ }
+
+ private def operationName(request: HttpRequest): String = {
+ val requestPath = request.path
+ val customMapping = settings.operationMappings.find {
+ case (pattern, _) => pattern.accept(requestPath)
+ }.map(_._2)
+
+ customMapping.getOrElse("http.request")
+ }
+ }
+
+
+ case class Settings(
+ enableContextPropagation: Boolean,
+ propagationChannel: String,
+ enableServerMetrics: Boolean,
+ serverMetricsTags: Seq[String],
+ enableTracing: Boolean,
+ traceIDTag: Option[String],
+ enableSpanMetrics: Boolean,
+ urlTagMode: TagMode,
+ methodTagMode: TagMode,
+ statusCodeTagMode: TagMode,
+ contextTags: Map[String, TagMode],
+ unhandledOperationName: String,
+ operationMappings: Map[GlobPathFilter, String]
+ )
+
+ object Settings {
+
+ sealed trait TagMode
+ object TagMode {
+ case object Metric extends TagMode
+ case object Span extends TagMode
+ case object Off extends TagMode
+
+ def from(value: String): TagMode = value.toLowerCase match {
+ case "metric" => TagMode.Metric
+ case "span" => TagMode.Span
+ case _ => TagMode.Off
+ }
+ }
+
+ def from(config: Config): Settings = {
+
+ // Context propagation settings
+ val enablePropagation = config.getBoolean("propagation.enabled")
+ val propagationChannel = config.getString("propagation.channel")
+
+ // HTTP Server metrics settings
+ val enableServerMetrics = config.getBoolean("metrics.enabled")
+ val serverMetricsTags = config.getStringList("metrics.tags").asScala
+
+ // Tracing settings
+ val enableTracing = config.getBoolean("tracing.enabled")
+ val traceIdTag = Option(config.getString("tracing.trace-id-tag")).filterNot(_ == "none")
+ val enableSpanMetrics = config.getBoolean("tracing.span-metrics")
+ val urlTagMode = TagMode.from(config.getString("tracing.tags.url"))
+ val methodTagMode = TagMode.from(config.getString("tracing.tags.method"))
+ val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code"))
+ val contextTags = config.getConfig("tracing.tags.from-context").pairs.map {
+ case (tagName, mode) => (tagName, TagMode.from(mode))
+ }
+
+ val unhandledOperationName = config.getString("tracing.operations.unhandled")
+ val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map {
+ case (pattern, operationName) => (new GlobPathFilter(pattern), operationName)
+ }
+
+ Settings(
+ enablePropagation,
+ propagationChannel,
+ enableServerMetrics,
+ serverMetricsTags,
+ enableTracing,
+ traceIdTag,
+ enableSpanMetrics,
+ urlTagMode,
+ methodTagMode,
+ statusCodeTagMode,
+ contextTags,
+ unhandledOperationName,
+ operationMappings
+ )
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
index 7d707c9f..7439520c 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -30,24 +30,24 @@ object SpanCodec {
class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter {
import B3.Headers
- override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = {
val identityProvider = Kamon.tracer.identityProvider
- val traceID = reader.read(Headers.TraceIdentifier)
+ val traceID = reader.readHeader(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val spanID = reader.read(Headers.SpanIdentifier)
+ val spanID = reader.readHeader(Headers.SpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
- val parentID = reader.read(Headers.ParentSpanIdentifier)
+ val parentID = reader.readHeader(Headers.ParentSpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val flags = reader.read(Headers.Flags)
+ val flags = reader.readHeader(Headers.Flags)
- val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match {
+ val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match {
case Some(sampled) if sampled == "1" => SamplingDecision.Sample
case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
@@ -59,19 +59,19 @@ object SpanCodec {
}
- override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
+ override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
val span = context.get(Span.ContextKey)
if(span.nonEmpty()) {
val spanContext = span.context()
- writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
- writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+ writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
if(spanContext.parentID != IdentityProvider.NoIdentifier)
- writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+ writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
- writer.write(Headers.Sampled, samplingDecision)
+ writer.writeHeader(Headers.Sampled, samplingDecision)
}
}
}