aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kamon-core-tests/src/test/resources/reference.conf119
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala30
-rw-r--r--kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala88
-rw-r--r--kamon-core/src/main/resources/reference.conf119
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala2
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala28
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala27
-rw-r--r--kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala192
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala22
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala3
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala13
11 files changed, 552 insertions, 91 deletions
diff --git a/kamon-core-tests/src/test/resources/reference.conf b/kamon-core-tests/src/test/resources/reference.conf
index 0d7ae9e2..d165249e 100644
--- a/kamon-core-tests/src/test/resources/reference.conf
+++ b/kamon-core-tests/src/test/resources/reference.conf
@@ -2,4 +2,123 @@ kamon {
context.codecs.string-keys {
request-id = "X-Request-ID"
}
+}
+
+
+
+kamon {
+ instrumentation {
+ http-server {
+ noop {
+
+ #
+ # Configuration for HTTP context propagation
+ #
+ propagation {
+
+ # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
+ # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
+ # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
+ enabled = no
+
+ # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
+ # configuration for more details on how to configure the detault HTTP context propagation.
+ channel = "default"
+ }
+
+
+ #
+ # Configuration for HTTP server metrics collection
+ #
+ metrics {
+
+ # Enables collection of HTTP server metrics
+ enabled = no
+
+ # Tags to include on the HTTP server metrics. The available options are:
+ # - method: HTTP method from the request.
+ # - status-code: HTTP status code from the responses.
+ #
+ tags = [
+ "method",
+ "status-code"
+ ]
+ }
+
+
+ #
+ # Configuration for HTTP request tracing
+ #
+ tracing {
+
+ # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests
+ # and finish them when the response is sent back to the clients.
+ enabled = no
+
+ # Select a context tag that provides a custom trace identifier. The custom trace identifier will be used
+ # only if all these conditions are met:
+ # - the context tag is present.
+ # - there is no parent Span on the incoming context (i.e. this is the first service on the trace).
+ # - the identifier is valued in accordance to the identity provider.
+ trace-id-tag = "none"
+
+ # Enables collection of span metrics using the `span.processing-time` metric.
+ span-metrics = on
+
+ # Select which tags should be included as span and span metric tags. The possible options are:
+ # - span: the tag is added as a Span tag (i.e. using span.tag(...))
+ # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
+ # - off: the tag is not used.
+ #
+ tags {
+
+ # Use the http.url tag.
+ url = span
+
+ # Use the http.method tag.
+ method = metric
+
+ # Use the http.status_code tag.
+ status-code = metric
+
+ # Copy tags from the context into the Spans with the specified purpouse.
+ from-context {
+
+ # The peer tag identifiest the service that is calling the current service. It is added by default with
+ # the HttpClient instrumentation.
+ peer = metric
+ }
+ }
+
+ # Custom mappings between routes and operation names.
+ operations {
+
+ # Operation name for Spans created on requests that could not be handled by any route in the current
+ # application.
+ unhandled = "unhandled"
+
+ # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
+ # instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
+ # For example, with the following configuration:
+ # mappings {
+ # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
+ # "/events/*/rsvps" = "EventRSVPs"
+ # }
+ #
+ # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
+ # the same operation name "/organization/:orgID/user/:userID/profile".
+ #
+ # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
+ # name "EventRSVPs".
+ #
+ # The patterns are expressed as globs and the operation names are free form.
+ #
+ mappings {
+
+ }
+ }
+ }
+ }
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
index 08d0b691..44165b98 100644
--- a/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/context/HttpPropagationSpec.scala
@@ -12,7 +12,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"The HTTP Context Propagation" when {
"reading from incoming requests" should {
"return an empty context if there are no tags nor keys" in {
- val context = httpPropagation.read(headerReaderFromMap(Map.empty))
+ val context = httpPropagation.readContext(headerReaderFromMap(Map.empty))
context.tags shouldBe empty
context.entries shouldBe empty
}
@@ -22,7 +22,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"x-content-tags" -> "hello=world;correlation=1234",
"x-mapped-tag" -> "value"
)
- val context = httpPropagation.read(headerReaderFromMap(headers))
+ val context = httpPropagation.readContext(headerReaderFromMap(headers))
context.tags should contain only(
"hello" -> "world",
"correlation" -> "1234",
@@ -32,7 +32,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"handle errors when reading HTTP headers" in {
val headers = Map("fail" -> "")
- val context = httpPropagation.read(headerReaderFromMap(headers))
+ val context = httpPropagation.readContext(headerReaderFromMap(headers))
context.tags shouldBe empty
context.entries shouldBe empty
}
@@ -44,7 +44,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"integer-header" -> "123"
)
- val context = httpPropagation.read(headerReaderFromMap(headers))
+ val context = httpPropagation.readContext(headerReaderFromMap(headers))
context.get(HttpPropagationSpec.StringKey) shouldBe "hey"
context.get(HttpPropagationSpec.IntegerKey) shouldBe 123
context.get(HttpPropagationSpec.OptionalKey) shouldBe empty
@@ -66,7 +66,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
def propagationWritingTests(direction: Direction.Write) = {
"not write anything if the context is empty" in {
val headers = mutable.Map.empty[String, String]
- httpPropagation.write(Context.Empty, headerWriterFromMap(headers), direction)
+ httpPropagation.writeContext(Context.Empty, headerWriterFromMap(headers), direction)
headers shouldBe empty
}
@@ -77,7 +77,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
"mappedTag" -> "value"
))
- httpPropagation.write(context, headerWriterFromMap(headers), direction)
+ httpPropagation.writeContext(context, headerWriterFromMap(headers), direction)
headers should contain only(
"x-content-tags" -> "hello=world;",
"x-mapped-tag" -> "value"
@@ -91,7 +91,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
HttpPropagationSpec.IntegerKey, 42,
)
- httpPropagation.write(context, headerWriterFromMap(headers), direction)
+ httpPropagation.writeContext(context, headerWriterFromMap(headers), direction)
headers should contain only(
"string-header" -> "out-we-go"
)
@@ -123,7 +123,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
def headerReaderFromMap(map: Map[String, String]): HttpPropagation.HeaderReader = new HttpPropagation.HeaderReader {
- override def read(header: String): Option[String] = {
+ override def readHeader(header: String): Option[String] = {
if(map.get("fail").nonEmpty)
sys.error("failing on purpose")
@@ -132,7 +132,7 @@ class HttpPropagationSpec extends WordSpec with Matchers with OptionValues {
}
def headerWriterFromMap(map: mutable.Map[String, String]): HttpPropagation.HeaderWriter = new HttpPropagation.HeaderWriter {
- override def write(header: String, value: String): Unit = map.put(header, value)
+ override def writeHeader(header: String, value: String): Unit = map.put(header, value)
}
}
@@ -146,20 +146,20 @@ object HttpPropagationSpec {
class StringEntryCodec extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter {
private val HeaderName = "string-header"
- override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
- reader.read(HeaderName)
+ override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ reader.readHeader(HeaderName)
.map(v => context.withKey(StringKey, v))
.getOrElse(context)
}
- override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
- Option(context.get(StringKey)).foreach(v => writer.write(HeaderName, v))
+ override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
+ Option(context.get(StringKey)).foreach(v => writer.writeHeader(HeaderName, v))
}
}
class IntegerEntryCodec extends HttpPropagation.EntryReader {
- override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
- reader.read("integer-header")
+ override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ reader.readHeader("integer-header")
.map(v => context.withKey(IntegerKey, v.toInt))
.getOrElse(context)
diff --git a/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
new file mode 100644
index 00000000..a3662127
--- /dev/null
+++ b/kamon-core-tests/src/test/scala/kamon/instrumentation/HttpServerInstrumentationSpec.scala
@@ -0,0 +1,88 @@
+package kamon.instrumentation
+
+import kamon.context.Context
+import kamon.testkit.SpanInspection
+import org.scalatest.{Matchers, OptionValues, WordSpec}
+
+import scala.collection.mutable
+
+class HttpServerInstrumentationSpec extends WordSpec with Matchers with SpanInspection with OptionValues {
+
+ "the HTTP server instrumentation" when {
+ "configured for context propagation" should {
+ "read context entries and tags from the incoming request" in {
+ val httpServer = HttpServer.from("default", port = 8080, "http.server")
+ val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.context.tags should contain only(
+ "tag" -> "value",
+ "none" -> "0011223344556677"
+ )
+ }
+
+ "use the configured HTTP propagation channel" in {
+ val httpServer = HttpServer.from("default", port = 8080, "http.server")
+ val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.context.tags should contain only(
+ "tag" -> "value",
+ "none" -> "0011223344556677"
+ )
+
+ val span = inspect(handler.span)
+ span.context().traceID.string shouldNot be("0011223344556677")
+ span.tag("http.method").value shouldBe "GET"
+ span.tag("http.url").value shouldBe "http://localhost:8080/"
+
+ val responseHeaders = mutable.Map.empty[String, String]
+ handler.startResponse(fakeResponse(200, responseHeaders), handler.context.withTag("hello", "world"))
+
+ }
+ }
+
+ "when all capabilities are disabled" should {
+ "not read any context from the incoming requests" in {
+ val httpServer = HttpServer.from("noop", port = 8081, "http.server")
+ val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.context shouldBe Context.Empty
+ }
+
+ "not create any span to represent the server request" in {
+ val httpServer = HttpServer.from("noop", port = 8081, "http.server")
+ val handler = httpServer.handle(fakeRequest("http://localhost:8080/", "/", "GET", Map(
+ "context-tags" -> "tag=value;none=0011223344556677;",
+ "custom-trace-id" -> "0011223344556677"
+ )))
+
+ handler.span.isEmpty() shouldBe true
+ }
+
+ "not record any HTTP server metrics" is (pending)
+ }
+ }
+
+ def fakeRequest(requestUrl: String, requestPath: String, requestMethod: String, headers: Map[String, String]): HttpRequest =
+ new HttpRequest {
+ override def url: String = requestUrl
+ override def path: String = requestPath
+ override def method: String = requestMethod
+ override def readHeader(header: String): Option[String] = headers.get(header)
+ }
+
+ def fakeResponse(responseStatusCode: Int, headers: mutable.Map[String, String]): HttpResponse.Writable[HttpResponse] =
+ new HttpResponse.Writable[HttpResponse] {
+ override def statusCode: Int = responseStatusCode
+ override def writeHeader(header: String, value: String): Unit = headers.put(header, value)
+ override def build(): HttpResponse = this
+ }
+}
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 279a00a9..8fb2abcc 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -183,12 +183,12 @@ kamon {
# Codecs to be used when propagating a Context through a HTTP Headers transport.
http-headers-keys {
- span = "kamon.trace.SpanCodec$B3"
+ //span = "kamon.trace.SpanCodec$B3"
}
# Codecs to be used when propagating a Context through a Binary transport.
binary-keys {
- span = "kamon.trace.SpanCodec$Colfer"
+ //span = "kamon.trace.SpanCodec$Colfer"
}
}
}
@@ -253,12 +253,29 @@ kamon {
http-server {
default {
+ #
+ # Configuration for HTTP context propagation
+ #
+ propagation {
+
+ # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
+ # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
+ # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
+ enabled = yes
+
+ # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
+ # configuration for more details on how to configure the detault HTTP context propagation.
+ channel = "default"
+ }
+
+
+ #
# Configuration for HTTP server metrics collection
#
metrics {
# Enables collection of HTTP server metrics
- enable = yes
+ enabled = yes
# Tags to include on the HTTP server metrics. The available options are:
# - method: HTTP method from the request.
@@ -270,13 +287,15 @@ kamon {
]
}
+
+ #
# Configuration for HTTP request tracing
#
tracing {
# Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests
# and finish them when the response is sent back to the clients.
- enable = yes
+ enabled = yes
# Select a context tag that provides a custom trace identifier. The custom trace identifier will be used
# only if all these conditions are met:
@@ -285,58 +304,60 @@ kamon {
# - the identifier is valued in accordance to the identity provider.
trace-id-tag = "none"
- # Metric tags to be automatically included in the HTTP server Spans. The available options are:
- # - method: HTTP method from the request.
- # - status-code: HTTP status code from the responses.
- # - peer: Name of the service issuing the HTTP call. Will use "unknown" if not present.
- metric-tags = [
- "method",
- "status-code",
- "peer"
- ]
- }
-
- # Configuration for HTTP context propagation
- #
- propagation {
+ # Enables collection of span metrics using the `span.processing-time` metric.
+ span-metrics = on
- # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if
- # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can
- # be created and reported but will not be linked across boundaries nor take trace identifiers from tags).
- enabled = yes
+ # Select which tags should be included as span and span metric tags. The possible options are:
+ # - span: the tag is added as a Span tag (i.e. using span.tag(...))
+ # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...))
+ # - off: the tag is not used.
+ #
+ tags {
- # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default
- # configuration for more details on how to configure the detault HTTP context propagation.
- channel = "default"
- }
+ # Use the http.url tag.
+ url = span
+ # Use the http.method tag.
+ method = metric
- # Custom mappings between routes and operation names.
- #
- operations {
+ # Use the http.status_code tag.
+ status-code = metric
- # Operation name for Spans created on requests that could not be handled by any route in the current
- # application.
- unhanlded = "unhandled"
+ # Copy tags from the context into the Spans with the specified purpouse.
+ from-context {
- # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
- # instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
- # For example, with the following configuration:
- # mappings {
- # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
- # "/events/*/rsvps" = "EventRSVPs"
- # }
- #
- # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
- # the same operation name "/organization/:orgID/user/:userID/profile".
- #
- # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
- # name "EventRSVPs".
- #
- # The patterns are expressed as globs and the operation names are free form.
- #
- mappings {
+ # The peer tag identifiest the service that is calling the current service. It is added by default with
+ # the HttpClient instrumentation.
+ peer = metric
+ }
+ }
+ # Custom mappings between routes and operation names.
+ operations {
+
+ # Operation name for Spans created on requests that could not be handled by any route in the current
+ # application.
+ unhandled = "unhandled"
+
+ # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode
+ # instrumentation is not able to provide a sensible operation name that is free of high cardinality values.
+ # For example, with the following configuration:
+ # mappings {
+ # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile"
+ # "/events/*/rsvps" = "EventRSVPs"
+ # }
+ #
+ # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have
+ # the same operation name "/organization/:orgID/user/:userID/profile".
+ #
+ # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation
+ # name "EventRSVPs".
+ #
+ # The patterns are expressed as globs and the operation names are free form.
+ #
+ mappings {
+
+ }
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index 1eed7e14..4d3501db 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -19,7 +19,7 @@ package context
import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
-class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) {
+class Context private (private[context] val entries: Map[Context.Key[_], Any], val tags: Map[String, String]) {
def get[T](key: Context.Key[T]): T =
entries.getOrElse(key, key.emptyValue).asInstanceOf[T]
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
index 5b0bdb38..d225aa28 100644
--- a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -25,7 +25,7 @@ trait HttpPropagation {
* @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
* empty context is returned instead.
*/
- def read(reader: HttpPropagation.HeaderReader): Context
+ def readContext(reader: HttpPropagation.HeaderReader): Context
/**
* Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
@@ -38,7 +38,7 @@ trait HttpPropagation {
* @param writer Wrapper on the HTTP message that will carry the context headers.
* @param direction Write direction. It can be either Outgoing or Returning.
*/
- def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
+ def writeContext(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
}
@@ -59,7 +59,7 @@ object HttpPropagation {
* @param context Current context.
* @return Either the original context passed in or a modified version of it, including the read entry.
*/
- def read(reader: HttpPropagation.HeaderReader, context: Context): Context
+ def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context
}
/**
@@ -75,7 +75,7 @@ object HttpPropagation {
* @param writer Wrapper on the HTTP message that will carry the context headers.
* @param direction Write direction. It can be either Outgoing or Returning.
*/
- def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
+ def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
}
@@ -90,7 +90,7 @@ object HttpPropagation {
* @param header HTTP header name
* @return The HTTP header value, if present.
*/
- def read(header: String): Option[String]
+ def readHeader(header: String): Option[String]
}
/**
@@ -104,7 +104,7 @@ object HttpPropagation {
* @param header HTTP header name.
* @param value HTTP header value.
*/
- def write(header: String, value: String): Unit
+ def writeHeader(header: String, value: String): Unit
}
@@ -131,12 +131,12 @@ object HttpPropagation {
* of a tag key clash.
* - Read all context entries using the incoming entries configuration.
*/
- override def read(reader: HeaderReader): Context = {
+ override def readContext(reader: HeaderReader): Context = {
val tags = Map.newBuilder[String, String]
// Tags encoded together in the context tags header.
try {
- reader.read(components.tagsHeaderName).foreach { contextTagsHeader =>
+ reader.readHeader(components.tagsHeaderName).foreach { contextTagsHeader =>
contextTagsHeader.split(";").foreach(tagData => {
val tagPair = tagData.split("=")
if (tagPair.length == 2) {
@@ -152,7 +152,7 @@ object HttpPropagation {
components.tagsMappings.foreach {
case (tagName, httpHeader) =>
try {
- reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ reader.readHeader(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
} catch {
case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
}
@@ -163,7 +163,7 @@ object HttpPropagation {
case (context, (entryName, entryDecoder)) =>
var result = context
try {
- result = entryDecoder.read(reader, context)
+ result = entryDecoder.readEntry(reader, context)
} catch {
case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
@@ -175,7 +175,7 @@ object HttpPropagation {
/**
* Writes context tags and entries
*/
- override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
+ override def writeContext(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
val keys = direction match {
case Direction.Outgoing => components.outgoingEntries
case Direction.Returning => components.returningEntries
@@ -193,21 +193,21 @@ object HttpPropagation {
// Write tags with specific mappings or append them to the context tags header.
context.tags.foreach {
case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
- case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
+ case Some(mappedHeader) => writer.writeHeader(mappedHeader, tagValue)
case None => appendTag(tagKey, tagValue)
}
}
// Write the context tags header.
if(contextTagsHeader.nonEmpty) {
- writer.write(components.tagsHeaderName, contextTagsHeader.result())
+ writer.writeHeader(components.tagsHeaderName, contextTagsHeader.result())
}
// Write entries for the specified direction.
keys.foreach {
case (entryName, entryWriter) =>
try {
- entryWriter.write(context, writer, direction)
+ entryWriter.writeEntry(context, writer, direction)
} catch {
case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
}
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
new file mode 100644
index 00000000..b0300546
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpMessage.scala
@@ -0,0 +1,27 @@
+package kamon.instrumentation
+
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
+
+
+trait HttpRequest extends HeaderReader {
+ def url: String
+ def path: String
+ def method: String
+}
+
+object HttpRequest {
+ trait Writable[T] extends HttpRequest with HeaderWriter {
+ def build(): T
+ }
+}
+
+trait HttpResponse {
+ def statusCode: Int
+}
+
+object HttpResponse {
+ trait Writable[T] extends HttpResponse with HeaderWriter {
+ def build(): T
+ }
+}
+
diff --git a/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
new file mode 100644
index 00000000..10dccdbc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/instrumentation/HttpServer.scala
@@ -0,0 +1,192 @@
+package kamon
+package instrumentation
+
+import com.typesafe.config.Config
+import kamon.context.Context
+import kamon.context.HttpPropagation.Direction
+import kamon.instrumentation.HttpServer.Settings.TagMode
+import kamon.trace.Span
+import kamon.util.GlobPathFilter
+
+import scala.collection.JavaConverters._
+
+trait HttpServer {
+
+ def handle(request: HttpRequest): HttpServer.Handler
+
+}
+
+object HttpServer {
+
+ trait Handler {
+
+ def context: Context
+
+ def span: Span
+
+ def finishRequest(): Unit
+
+ def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse
+
+ def endResponse(): Unit
+ }
+
+ def from(name: String, port: Int, component: String): HttpServer = {
+ from(name, port, component, Kamon, Kamon)
+ }
+
+ def from(name: String, port: Int, component: String, configuration: Configuration, contextPropagation: ContextPropagation): HttpServer = {
+ val configKey = "kamon.instrumentation.http-server." + name
+ new HttpServer.Default(Settings.from(configuration.config().getConfig(configKey)), contextPropagation, port, component)
+ }
+
+
+ class Default(settings: Settings, contextPropagation: ContextPropagation, port: Int, component: String) extends HttpServer {
+ private val _propagation = contextPropagation.httpPropagation(settings.propagationChannel)
+ .getOrElse(sys.error(s"Could not find HTTP propagation [${settings.propagationChannel}"))
+
+ override def handle(request: HttpRequest): Handler = {
+ val incomingContext = if(settings.enableContextPropagation)
+ _propagation.readContext(request)
+ else Context.Empty
+
+ val requestSpan = if(settings.enableTracing)
+ buildServerSpan(incomingContext, request)
+ else Span.Empty
+
+ val handlerContext = if(requestSpan.nonEmpty())
+ incomingContext.withKey(Span.ContextKey, requestSpan)
+ else incomingContext
+
+ // TODO: Handle HTTP Server Metrics
+
+
+ new HttpServer.Handler {
+ override def context: Context =
+ handlerContext
+
+ override def span: Span =
+ requestSpan
+
+ override def finishRequest(): Unit = {}
+
+ override def startResponse[HttpResponse](response: HttpResponse.Writable[HttpResponse], context: Context): HttpResponse = {
+ if(settings.enableContextPropagation) {
+ _propagation.writeContext(context, response, Direction.Returning)
+ }
+
+ response.build()
+ }
+
+ override def endResponse(): Unit = {
+ span.finish()
+ }
+ }
+ }
+
+ private def buildServerSpan(context: Context, request: HttpRequest): Span = {
+ val span = Kamon.buildSpan(operationName(request))
+ .withMetricTag("span.kind", "server")
+ .withMetricTag("component", component)
+
+ def addTag(tag: String, value: String, mode: TagMode): Unit = mode match {
+ case TagMode.Metric => span.withMetricTag(tag, value)
+ case TagMode.Span => span.withTag(tag, value)
+ case TagMode.Off =>
+ }
+
+ addTag("http.url", request.url, settings.urlTagMode)
+ addTag("http.method", request.method, settings.urlTagMode)
+ settings.contextTags.foreach {
+ case (tagName, mode) => context.getTag(tagName).foreach(tagValue => addTag(tagName, tagValue, mode))
+ }
+
+ span.start()
+ }
+
+ private def operationName(request: HttpRequest): String = {
+ val requestPath = request.path
+ val customMapping = settings.operationMappings.find {
+ case (pattern, _) => pattern.accept(requestPath)
+ }.map(_._2)
+
+ customMapping.getOrElse("http.request")
+ }
+ }
+
+
+ case class Settings(
+ enableContextPropagation: Boolean,
+ propagationChannel: String,
+ enableServerMetrics: Boolean,
+ serverMetricsTags: Seq[String],
+ enableTracing: Boolean,
+ traceIDTag: Option[String],
+ enableSpanMetrics: Boolean,
+ urlTagMode: TagMode,
+ methodTagMode: TagMode,
+ statusCodeTagMode: TagMode,
+ contextTags: Map[String, TagMode],
+ unhandledOperationName: String,
+ operationMappings: Map[GlobPathFilter, String]
+ )
+
+ object Settings {
+
+ sealed trait TagMode
+ object TagMode {
+ case object Metric extends TagMode
+ case object Span extends TagMode
+ case object Off extends TagMode
+
+ def from(value: String): TagMode = value.toLowerCase match {
+ case "metric" => TagMode.Metric
+ case "span" => TagMode.Span
+ case _ => TagMode.Off
+ }
+ }
+
+ def from(config: Config): Settings = {
+
+ // Context propagation settings
+ val enablePropagation = config.getBoolean("propagation.enabled")
+ val propagationChannel = config.getString("propagation.channel")
+
+ // HTTP Server metrics settings
+ val enableServerMetrics = config.getBoolean("metrics.enabled")
+ val serverMetricsTags = config.getStringList("metrics.tags").asScala
+
+ // Tracing settings
+ val enableTracing = config.getBoolean("tracing.enabled")
+ val traceIdTag = Option(config.getString("tracing.trace-id-tag")).filterNot(_ == "none")
+ val enableSpanMetrics = config.getBoolean("tracing.span-metrics")
+ val urlTagMode = TagMode.from(config.getString("tracing.tags.url"))
+ val methodTagMode = TagMode.from(config.getString("tracing.tags.method"))
+ val statusCodeTagMode = TagMode.from(config.getString("tracing.tags.status-code"))
+ val contextTags = config.getConfig("tracing.tags.from-context").pairs.map {
+ case (tagName, mode) => (tagName, TagMode.from(mode))
+ }
+
+ val unhandledOperationName = config.getString("tracing.operations.unhandled")
+ val operationMappings = config.getConfig("tracing.operations.mappings").pairs.map {
+ case (pattern, operationName) => (new GlobPathFilter(pattern), operationName)
+ }
+
+ Settings(
+ enablePropagation,
+ propagationChannel,
+ enableServerMetrics,
+ serverMetricsTags,
+ enableTracing,
+ traceIdTag,
+ enableSpanMetrics,
+ urlTagMode,
+ methodTagMode,
+ statusCodeTagMode,
+ contextTags,
+ unhandledOperationName,
+ operationMappings
+ )
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
index 7d707c9f..7439520c 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -30,24 +30,24 @@ object SpanCodec {
class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter {
import B3.Headers
- override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
+ override def readEntry(reader: HttpPropagation.HeaderReader, context: Context): Context = {
val identityProvider = Kamon.tracer.identityProvider
- val traceID = reader.read(Headers.TraceIdentifier)
+ val traceID = reader.readHeader(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val spanID = reader.read(Headers.SpanIdentifier)
+ val spanID = reader.readHeader(Headers.SpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
- val parentID = reader.read(Headers.ParentSpanIdentifier)
+ val parentID = reader.readHeader(Headers.ParentSpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val flags = reader.read(Headers.Flags)
+ val flags = reader.readHeader(Headers.Flags)
- val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match {
+ val samplingDecision = flags.orElse(reader.readHeader(Headers.Sampled)) match {
case Some(sampled) if sampled == "1" => SamplingDecision.Sample
case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
@@ -59,19 +59,19 @@ object SpanCodec {
}
- override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
+ override def writeEntry(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
val span = context.get(Span.ContextKey)
if(span.nonEmpty()) {
val spanContext = span.context()
- writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
- writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+ writer.writeHeader(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ writer.writeHeader(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
if(spanContext.parentID != IdentityProvider.NoIdentifier)
- writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+ writer.writeHeader(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
- writer.write(Headers.Sampled, samplingDecision)
+ writer.writeHeader(Headers.Sampled, samplingDecision)
}
}
}
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
index 24e8390a..d36d0df2 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
@@ -23,6 +23,9 @@ trait ContextTesting {
def contextWithLocal(value: String): Context =
Context.of(StringKey, Some(value))
+
+
+
}
object ContextTesting extends ContextTesting
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
index fbfdc7c3..36b57f2d 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/SpanInspection.scala
@@ -19,7 +19,7 @@ import java.time.Instant
import kamon.Kamon
import kamon.trace.{Span, SpanContext}
-import kamon.trace.Span.FinishedSpan
+import kamon.trace.Span.{FinishedSpan, TagValue}
import scala.reflect.ClassTag
import scala.util.Try
@@ -52,9 +52,20 @@ object SpanInspection {
def spanTags(): Map[String, Span.TagValue] =
spanData.tags
+ def tag(key: String): Option[String] =
+ spanTag(key).map {
+ case TagValue.String(string) => string
+ case TagValue.Number(number) => number.toString
+ case TagValue.True => "true"
+ case TagValue.False => "false"
+ }
+
def metricTags(): Map[String, String] =
getField[Span.Local, Map[String, String]](realSpan, "customMetricTags")
+ def metricTag(key: String): Option[String] =
+ metricTags().get(key)
+
def from(): Instant =
getField[Span.Local, Instant](realSpan, "from")