aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context/HttpPropagation.scala')
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala206
1 files changed, 206 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
new file mode 100644
index 00000000..6a15e2a6
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -0,0 +1,206 @@
+/* =========================================================================================
+ * Copyright © 2013-2018 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+import scala.util.{Failure, Success}
+
+/**
+ * Context propagation that uses HTTP headers as the transport medium. HTTP propagation mechanisms read any number of
+ * HTTP headers from incoming HTTP requests to decode a Context instance and write any number of HTTP headers on
+ * outgoing requests to transfer a context to remote services.
+ */
+object HttpPropagation {
+
+ /**
+ * Wrapper that reads HTTP headers from a HTTP message.
+ */
+ trait HeaderReader {
+
+ /**
+ * Reads a single HTTP header value.
+ *
+ * @param header HTTP header name
+ * @return The HTTP header value, if present.
+ */
+ def read(header: String): Option[String]
+
+ /**
+ * Reads all HTTP headers present in the wrapped HTTP message.
+ *
+ * @return A map from header name to
+ */
+ def readAll(): Map[String, String]
+ }
+
+ /**
+ * Wrapper that writes HTTP headers to a HTTP message.
+ */
+ trait HeaderWriter {
+
+ /**
+ * Writes a HTTP header into a HTTP message.
+ *
+ * @param header HTTP header name.
+ * @param value HTTP header value.
+ */
+ def write(header: String, value: String): Unit
+ }
+
+
+
+ /**
+ * Create a new default HTTP propagation instance from the provided configuration.
+ *
+ * @param config HTTP propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] = {
+ new HttpPropagation.Default(Settings.from(config, classLoading))
+ }
+
+ /**
+ * Default HTTP Propagation in Kamon.
+ */
+ class Default(settings: Settings) extends Propagation[HttpPropagation.HeaderReader, HttpPropagation.HeaderWriter] {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
+
+ /**
+ * Reads context tags and entries on the following order:
+ * 1. Read all context tags from the context tags header.
+ * 2. Read all context tags with explicit mappings. This overrides any tag
+ * from the previous step in case of a tag key clash.
+ * 3. Read all context entries using the incoming entries configuration.
+ */
+ override def read(reader: HeaderReader): Context = {
+ val tags = Map.newBuilder[String, String]
+
+ // Tags encoded together in the context tags header.
+ try {
+ reader.read(settings.tagsHeaderName).foreach { contextTagsHeader =>
+ contextTagsHeader.split(";").foreach(tagData => {
+ val tagPair = tagData.split("=")
+ if (tagPair.length == 2) {
+ tags += (tagPair(0) -> tagPair(1))
+ }
+ })
+ }
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
+ }
+
+ // Tags explicitly mapped on the tags.mappings configuration.
+ settings.tagsMappings.foreach {
+ case (tagName, httpHeader) =>
+ try {
+ reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
+ }
+ }
+
+ // Incoming Entries
+ settings.incomingEntries.foldLeft(Context.of(tags.result())) {
+ case (context, (entryName, entryDecoder)) =>
+ var result = context
+ try {
+ result = entryDecoder.read(reader, context)
+ } catch {
+ case NonFatal(t) => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+
+ result
+ }
+ }
+
+ /**
+ * Writes context tags and entries
+ */
+ override def write(context: Context, writer: HeaderWriter): Unit = {
+ val contextTagsHeader = new StringBuilder()
+ def appendTag(key: String, value: String): Unit = {
+ contextTagsHeader
+ .append(key)
+ .append('=')
+ .append(value)
+ .append(';')
+ }
+
+ // Write tags with specific mappings or append them to the context tags header.
+ context.tags.foreach {
+ case (tagKey, tagValue) => settings.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
+ case None => appendTag(tagKey, tagValue)
+ }
+ }
+
+ // Write the context tags header.
+ if(contextTagsHeader.nonEmpty) {
+ writer.write(settings.tagsHeaderName, contextTagsHeader.result())
+ }
+
+ // Write entries for the specified direction.
+ settings.outgoingEntries.foreach {
+ case (entryName, entryWriter) =>
+ try {
+ entryWriter.write(context, writer)
+ } catch {
+ case NonFatal(t) => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+ }
+ }
+ }
+
+ case class Settings(
+ tagsHeaderName: String,
+ tagsMappings: Map[String, String],
+ incomingEntries: Map[String, Propagation.EntryReader[HeaderReader]],
+ outgoingEntries: Map[String, Propagation.EntryWriter[HeaderWriter]]
+ )
+
+ object Settings {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Settings])
+
+ def from(config: Config, classLoading: ClassLoading): Settings = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val instanceMap = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, componentClass) => classLoading.createInstance[ExpectedType](componentClass, Nil) match {
+ case Success(componentInstance) => instanceMap += (contextKey -> componentInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, componentClass, exception)
+ }
+ }
+
+ instanceMap.result()
+ }
+
+ val tagsHeaderName = config.getString("tags.header-name")
+ val tagsMappings = config.getConfig("tags.mappings").pairs
+ val incomingEntries = buildInstances[Propagation.EntryReader[HeaderReader]](config.getConfig("entries.incoming").pairs)
+ val outgoingEntries = buildInstances[Propagation.EntryWriter[HeaderWriter]](config.getConfig("entries.outgoing").pairs)
+
+ Settings(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries)
+ }
+ }
+
+}