aboutsummaryrefslogtreecommitdiff
path: root/kamon-core
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2018-08-30 10:40:53 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2018-08-30 10:40:53 +0200
commite4abea098ef4d6e71a805812bfa95c14bd9002b5 (patch)
treef5fcb8222e293f420a9e7c06953805a7428d0f0e /kamon-core
parent794fbf02664ac8c31072d8b955d897901f1f22e0 (diff)
downloadKamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.gz
Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.bz2
Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.zip
working on context tags and http propagation improvements
Diffstat (limited to 'kamon-core')
-rw-r--r--kamon-core/src/main/resources/reference.conf54
-rw-r--r--kamon-core/src/main/scala/kamon/ClassLoading.scala26
-rw-r--r--kamon-core/src/main/scala/kamon/Configuration.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/ContextPropagation.scala67
-rw-r--r--kamon-core/src/main/scala/kamon/ContextStorage.scala47
-rw-r--r--kamon-core/src/main/scala/kamon/Environment.scala5
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala69
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codecs.scala108
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala105
-rw-r--r--kamon-core/src/main/scala/kamon/context/HttpPropagation.scala283
-rw-r--r--kamon-core/src/main/scala/kamon/context/Mixin.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/package.scala10
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala55
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala4
15 files changed, 685 insertions, 254 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 60fa156d..5e5078ca 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -194,6 +194,60 @@ kamon {
}
+ propagation {
+ channels {
+ http {
+ type = http
+
+ tags {
+
+ # Header name used to encode context tags.
+ header-name = "context-tags"
+
+
+ # Provide explicit mappins between context tags and the HTTP headers that will carry them. When there is
+ # an explicit mapping for a tag, it will not be included in the default context header. For example, if
+ # you wanted to use the an HTTP header called `X-Correlation-ID` for a context tag with key `correlationID`
+ # you would need to include a the following configuration:
+ #
+ # mappings {
+ # correlationID = "X-Correlation-ID"
+ # }
+ #
+ # The correlationID tag would always be read and written from the `X-Correlation-ID` header. The context
+ # tag name is represented as the configuration key and the desired header name is represented by the
+ # cofiguration value.
+ #
+ mappings {
+
+ }
+ }
+
+ entries {
+
+ # Specify mappings between Context keys and the Http.EntryReader implementation in charge of reading them
+ # from the incoming HTTP request into the Context.
+ incoming {
+ #span = "something"
+ }
+
+ # Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them
+ # on the outgoing HTTP requests.
+ outgoing {
+
+ }
+
+ # Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them
+ # on the outgoing HTTP response sent back to clients.
+ returning {
+
+ }
+ }
+ }
+ }
+ }
+
+
util {
filters {
diff --git a/kamon-core/src/main/scala/kamon/ClassLoading.scala b/kamon-core/src/main/scala/kamon/ClassLoading.scala
new file mode 100644
index 00000000..5b097af1
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ClassLoading.scala
@@ -0,0 +1,26 @@
+package kamon
+
+import kamon.util.DynamicAccess
+
+import scala.collection.immutable
+import scala.reflect.ClassTag
+import scala.util.Try
+
+/**
+ * Utilities for creating instances from fully qualified class names.
+ */
+trait ClassLoading {
+ @volatile private var _dynamicAccessClassLoader = this.getClass.getClassLoader
+ @volatile private var _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader)
+
+ def classLoader(): ClassLoader =
+ _dynamicAccessClassLoader
+
+ def changeClassLoader(classLoader: ClassLoader): Unit = synchronized {
+ _dynamicAccessClassLoader = classLoader
+ _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader)
+ }
+
+ def createInstance[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
+ _dynamicAccess.createInstanceFor(fqcn, args)
+}
diff --git a/kamon-core/src/main/scala/kamon/Configuration.scala b/kamon-core/src/main/scala/kamon/Configuration.scala
new file mode 100644
index 00000000..bd286792
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/Configuration.scala
@@ -0,0 +1,57 @@
+package kamon
+
+import scala.util.Try
+import com.typesafe.config.{Config, ConfigFactory}
+import org.slf4j.LoggerFactory
+
+trait Configuration { self: ClassLoading =>
+ private val logger = LoggerFactory.getLogger(classOf[Configuration])
+ private var _currentConfig: Config = ConfigFactory.load(self.classLoader())
+ private var _onReconfigureHooks = Seq.empty[Configuration.OnReconfigureHook]
+
+
+ /**
+ * Retrieve Kamon's current configuration.
+ */
+ def config(): Config =
+ _currentConfig
+
+ /**
+ * Supply a new Config instance to rule Kamon's world.
+ */
+ def reconfigure(newConfig: Config): Unit = synchronized {
+ _currentConfig = newConfig
+ _onReconfigureHooks.foreach(hook => {
+ Try(hook.onReconfigure(newConfig)).failed.foreach(error =>
+ logger.error("Exception occurred while trying to run a OnReconfigureHook", error)
+ )
+ })
+ }
+
+ /**
+ * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All
+ * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config).
+ */
+ def onReconfigure(hook: Configuration.OnReconfigureHook): Unit = synchronized {
+ _onReconfigureHooks = hook +: _onReconfigureHooks
+ }
+
+ /**
+ * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All
+ * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config).
+ */
+ def onReconfigure(hook: (Config) => Unit): Unit = {
+ onReconfigure(new Configuration.OnReconfigureHook {
+ override def onReconfigure(newConfig: Config): Unit = hook.apply(newConfig)
+ })
+ }
+
+}
+
+object Configuration {
+
+ trait OnReconfigureHook {
+ def onReconfigure(newConfig: Config): Unit
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/ContextPropagation.scala b/kamon-core/src/main/scala/kamon/ContextPropagation.scala
new file mode 100644
index 00000000..518aa021
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ContextPropagation.scala
@@ -0,0 +1,67 @@
+package kamon
+
+import com.typesafe.config.Config
+import kamon.context.HttpPropagation
+
+trait ContextPropagation { self: Configuration with ClassLoading =>
+ @volatile private var _propagationComponents: ContextPropagation.Components = _
+ @volatile private var _defaultHttpPropagation: HttpPropagation = _
+
+ // Initial configuration and reconfigures
+ init(self.config)
+ self.onReconfigure(newConfig => self.init(newConfig))
+
+
+ /**
+ * Retrieves the HTTP propagation channel with the supplied name. Propagation channels are configured on the
+ * kamon.propagation.channels configuration setting.
+ *
+ * @param channelName Channel name to retrieve.
+ * @return The HTTP propagation, if defined.
+ */
+ def httpPropagation(channelName: String): Option[HttpPropagation] =
+ _propagationComponents.httpChannels.get(channelName)
+
+ /**
+ * Retrieves the default HTTP propagation channel. Configuration for this channel can be found under the
+ * kamon.propagation.channels.http configuration setting.
+ *
+ * @return The default HTTP propagation.
+ */
+ def defaultHttpPropagation(): HttpPropagation =
+ _defaultHttpPropagation
+
+
+
+ private def init(config: Config): Unit = synchronized {
+ _propagationComponents = ContextPropagation.Components.from(self.config, self)
+ _defaultHttpPropagation = _propagationComponents.httpChannels(ContextPropagation.DefaultHttpChannel)
+ }
+}
+
+object ContextPropagation {
+ val DefaultHttpChannel = "http"
+ val DefaultBinaryChannel = "binary"
+
+ case class Components(
+ httpChannels: Map[String, HttpPropagation]
+ )
+
+ object Components {
+
+ def from(config: Config, classLoading: ClassLoading): Components = {
+ val propagationConfig = config.getConfig("kamon.propagation")
+ val channels = propagationConfig.getConfig("channels").configurations
+
+ val httpChannels = Map.newBuilder[String, HttpPropagation]
+
+ channels.foreach {
+ case (channelName, channelConfig) => channelConfig.getString("type") match {
+ case "http" => httpChannels += (channelName -> HttpPropagation.from(channelConfig, classLoading))
+ }
+ }
+
+ Components(httpChannels.result())
+ }
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/ContextStorage.scala b/kamon-core/src/main/scala/kamon/ContextStorage.scala
new file mode 100644
index 00000000..ee35264a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/ContextStorage.scala
@@ -0,0 +1,47 @@
+package kamon
+
+import kamon.context.{Context, Storage}
+import kamon.trace.Span
+
+trait ContextStorage {
+ private val _contextStorage = Storage.ThreadLocal()
+
+ def currentContext(): Context =
+ _contextStorage.current()
+
+ def currentSpan(): Span =
+ _contextStorage.current().get(Span.ContextKey)
+
+ def storeContext(context: Context): Storage.Scope =
+ _contextStorage.store(context)
+
+ def withContext[T](context: Context)(f: => T): T = {
+ val scope = _contextStorage.store(context)
+ try {
+ f
+ } finally {
+ scope.close()
+ }
+ }
+
+ def withContextKey[T, K](key: Context.Key[K], value: K)(f: => T): T =
+ withContext(currentContext().withKey(key, value))(f)
+
+ def withSpan[T](span: Span)(f: => T): T =
+ withSpan(span, true)(f)
+
+ def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = {
+ try {
+ withContextKey(Span.ContextKey, span)(f)
+ } catch {
+ case t: Throwable =>
+ span.addError(t.getMessage, t)
+ throw t
+
+ } finally {
+ if(finishSpan)
+ span.finish()
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala
index 1c00679d..2e07d18d 100644
--- a/kamon-core/src/main/scala/kamon/Environment.scala
+++ b/kamon-core/src/main/scala/kamon/Environment.scala
@@ -21,6 +21,9 @@ import java.util.concurrent.ThreadLocalRandom
import com.typesafe.config.Config
import kamon.util.HexCodec
+
+
+
case class Environment(host: String, service: String, instance: String, incarnation: String, tags: Map[String, String])
object Environment {
@@ -47,6 +50,4 @@ object Environment {
private def readValueOrGenerate(configuredValue: String, generator: => String): String =
if(configuredValue == "auto") generator else configuredValue
-
-
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 8f69ab41..2825d961 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -19,7 +19,7 @@ import java.time.Duration
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
import com.typesafe.config.{Config, ConfigFactory}
-import kamon.context.{Codecs, Context, Key, Storage}
+import kamon.context.Codecs
import kamon.metric._
import kamon.trace._
import kamon.util.{Clock, Filters, Matcher, Registration}
@@ -29,7 +29,7 @@ import scala.concurrent.Future
import scala.util.Try
-object Kamon extends MetricLookup with ReporterRegistry with Tracer {
+object Kamon extends MetricLookup with ClassLoading with Configuration with ReporterRegistry with Tracer with ContextPropagation with ContextStorage {
private val logger = LoggerFactory.getLogger("kamon.Kamon")
@volatile private var _config = ConfigFactory.load()
@@ -41,19 +41,15 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock)
private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock)
- private val _contextStorage = Storage.ThreadLocal()
private val _contextCodec = new Codecs(_config)
- private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
+ //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
sys.addShutdownHook(() => _scheduler.shutdown())
def environment: Environment =
_environment
- def config(): Config =
- _config
-
- def reconfigure(config: Config): Unit = synchronized {
+ onReconfigure(newConfig => {
_config = config
_environment = Environment.fromConfig(config)
_filters = Filters.fromConfig(config)
@@ -62,18 +58,11 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
_tracer.reconfigure(config)
_contextCodec.reconfigure(config)
- _onReconfigureHooks.foreach(hook => {
- Try(hook.onReconfigure(config)).failed.foreach(error =>
- logger.error("Exception occurred while trying to run a OnReconfigureHook", error)
- )
- })
-
_scheduler match {
case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config))
case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other)
}
- }
-
+ })
override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric =
_metrics.histogram(name, unit, dynamicRange)
@@ -105,43 +94,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
def contextCodec(): Codecs =
_contextCodec
- def currentContext(): Context =
- _contextStorage.current()
-
- def currentSpan(): Span =
- _contextStorage.current().get(Span.ContextKey)
-
- def storeContext(context: Context): Storage.Scope =
- _contextStorage.store(context)
-
- def withContext[T](context: Context)(f: => T): T = {
- val scope = _contextStorage.store(context)
- try {
- f
- } finally {
- scope.close()
- }
- }
-
- def withContextKey[T, K](key: Key[K], value: K)(f: => T): T =
- withContext(currentContext().withKey(key, value))(f)
-
- def withSpan[T](span: Span)(f: => T): T =
- withSpan(span, true)(f)
- def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = {
- try {
- withContextKey(Span.ContextKey, span)(f)
- } catch {
- case t: Throwable =>
- span.addError(t.getMessage, t)
- throw t
-
- } finally {
- if(finishSpan)
- span.finish()
- }
- }
override def loadReportersFromConfig(): Unit =
_reporterRegistry.loadReportersFromConfig()
@@ -173,14 +126,6 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
def clock(): Clock =
_clock
- /**
- * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All
- * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config).
- */
- def onReconfigure(hook: OnReconfigureHook): Unit = synchronized {
- _onReconfigureHooks = hook +: _onReconfigureHooks
- }
-
def scheduler(): ScheduledExecutorService =
_scheduler
@@ -188,7 +133,3 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
config.getInt("kamon.scheduler-pool-size")
}
-
-trait OnReconfigureHook {
- def onReconfigure(newConfig: Config): Unit
-}
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala
index c5d237a9..465f53be 100644
--- a/kamon-core/src/main/scala/kamon/context/Codecs.scala
+++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala
@@ -100,22 +100,22 @@ object Codecs {
override def encode(context: Context): TextMap = {
val encoded = TextMap.Default()
- context.entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(codec) =>
- try {
- codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
- } catch {
- case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
- }
-
- case None =>
- log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
+// context.entries.foreach {
+// case (key, _) if key.broadcast =>
+// entryCodecs.get(key.name) match {
+// case Some(codec) =>
+// try {
+// codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+// } catch {
+// case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+// }
+//
+// case None =>
+// log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+// }
+//
+// case _ => // All non-broadcast keys should be ignored.
+// }
encoded
}
@@ -150,29 +150,29 @@ object Codecs {
emptyBuffer
else {
var colferEntries: List[ColferEntry] = Nil
- entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(entryCodec) =>
- try {
- val entryData = entryCodec.encode(context)
- if(entryData.capacity() > 0) {
- val colferEntry = new ColferEntry()
- colferEntry.setName(key.name)
- colferEntry.setContent(entryData.array())
- colferEntries = colferEntry :: colferEntries
- }
- } catch {
- case throwable: Throwable =>
- log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
- }
-
- case None =>
- log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
- }
-
- case _ => // All non-broadcast keys should be ignored.
- }
+// entries.foreach {
+// case (key, _) if key.broadcast =>
+// entryCodecs.get(key.name) match {
+// case Some(entryCodec) =>
+// try {
+// val entryData = entryCodec.encode(context)
+// if(entryData.capacity() > 0) {
+// val colferEntry = new ColferEntry()
+// colferEntry.setName(key.name)
+// colferEntry.setContent(entryData.array())
+// colferEntries = colferEntry :: colferEntries
+// }
+// } catch {
+// case throwable: Throwable =>
+// log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
+// }
+//
+// case None =>
+// log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
+// }
+//
+// case _ => // All non-broadcast keys should be ignored.
+// }
if(colferEntries.isEmpty)
emptyBuffer
@@ -226,38 +226,40 @@ object Codecs {
}
private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] {
- private val contextKey = Key.broadcast[Option[String]](key, None)
+ //private val contextKey = Key.broadcast[Option[String]](key, None)
override def encode(context: Context): TextMap = {
val textMap = TextMap.Default()
- context.get(contextKey).foreach { value =>
- textMap.put(headerName, value)
- }
+// context.get(contextKey).foreach { value =>
+// textMap.put(headerName, value)
+// }
textMap
}
override def decode(carrier: TextMap, context: Context): Context = {
- carrier.get(headerName) match {
- case value @ Some(_) => context.withKey(contextKey, value)
- case None => context
- }
+ ???
+// carrier.get(headerName) match {
+// case value @ Some(_) => context.withKey(contextKey, value)
+// case None => context
+// }
}
}
private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] {
val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0)
- private val contextKey = Key.broadcast[Option[String]](key, None)
+ //private val contextKey = Key.broadcast[Option[String]](key, None)
override def encode(context: Context): ByteBuffer = {
- context.get(contextKey) match {
- case Some(value) => ByteBuffer.wrap(value.getBytes)
- case None => emptyBuffer
- }
+// context.get(contextKey) match {
+// case Some(value) => ByteBuffer.wrap(value.getBytes)
+// case None => emptyBuffer
+// }
+ ???
}
override def decode(carrier: ByteBuffer, context: Context): Context = {
- context.withKey(contextKey, Some(new String(carrier.array())))
+ ??? //context.withKey(contextKey, Some(new String(carrier.array())))
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index e0b084cb..1eed7e14 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -13,90 +13,83 @@
* =========================================================================================
*/
-package kamon.context
+package kamon
+package context
-import java.io._
-import java.nio.ByteBuffer
+import java.util.{Map => JavaMap}
+import scala.collection.JavaConverters._
-import kamon.Kamon
+class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) {
-class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable {
- def get[T](key: Key[T]): T =
+ def get[T](key: Context.Key[T]): T =
entries.getOrElse(key, key.emptyValue).asInstanceOf[T]
- def withKey[T](key: Key[T], value: T): Context =
- new Context(entries.updated(key, value))
+ def getTag(tagKey: String): Option[String] =
+ tags.get(tagKey)
- var _deserializedEntries: Map[Key[_], Any] = Map.empty
+ def withKey[T](key: Context.Key[T], value: T): Context =
+ new Context(entries.updated(key, value), tags)
- @throws[IOException]
- private def writeObject(out: ObjectOutputStream): Unit = out.write(
- Kamon.contextCodec().Binary.encode(this).array()
- )
+ def withTag(tagKey: String, tagValue: String): Context =
+ new Context(entries, tags.updated(tagKey, tagValue))
- @throws[IOException]
- @throws[ClassNotFoundException]
- private def readObject(in: ObjectInputStream): Unit = {
- val buf = new Array[Byte](in.available())
- in.readFully(buf)
- _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries
- }
-
- def readResolve(): AnyRef = new Context(_deserializedEntries)
+ def withTags(tags: Map[String, String]): Context =
+ new Context(entries, this.tags ++ tags)
- override def equals(obj: scala.Any): Boolean = {
- obj != null &&
- obj.isInstanceOf[Context] &&
- obj.asInstanceOf[Context].entries != null &&
- obj.asInstanceOf[Context].entries == this.entries
- }
+ def withTags(tags: JavaMap[String, String]): Context =
+ new Context(entries, this.tags ++ tags.asScala.toMap)
- override def hashCode(): Int = entries.hashCode()
}
object Context {
- val Empty = new Context(Map.empty)
-
- def apply(): Context =
- Empty
+ val Empty = new Context(Map.empty, Map.empty)
- def create(): Context =
- Empty
+ def of(tags: JavaMap[String, String]): Context =
+ new Context(Map.empty, tags.asScala.toMap)
- def apply[T](key: Key[T], value: T): Context =
- new Context(Map(key -> value))
+ def of(tags: Map[String, String]): Context =
+ new Context(Map.empty, tags)
- def create[T](key: Key[T], value: T): Context =
- apply(key, value)
+ def of[T](key: Context.Key[T], value: T): Context =
+ new Context(Map(key -> value), Map.empty)
-}
-
-
-sealed abstract class Key[T] {
- def name: String
- def emptyValue: T
- def broadcast: Boolean
-}
+ def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context =
+ new Context(Map(key -> value), tags.asScala.toMap)
-object Key {
+ def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context =
+ new Context(Map(key -> value), tags)
- def local[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, false)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context =
+ new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), Map.empty)
- def broadcast[T](name: String, emptyValue: T): Key[T] =
- new Default[T](name, emptyValue, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context =
+ new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags.asScala.toMap)
- def broadcastString(name: String): Key[Option[String]] =
- new Default[Option[String]](name, None, true)
+ def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context =
+ new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags)
+ def key[T](name: String, emptyValue: T): Context.Key[T] =
+ new Context.Key(name, emptyValue)
- private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] {
+ /**
+ * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance
+ * must be done using a context key, which will ensure the right type is used on both operations. The key's name
+ * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels.
+ *
+ * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned
+ * instead.
+ *
+ * @param name Key name. Must be unique.
+ * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key.
+ * @tparam ValueType Type of the value to be held on the context with this key.
+ */
+ final class Key[ValueType](val name: String, val emptyValue: ValueType) {
override def hashCode(): Int =
name.hashCode
override def equals(that: Any): Boolean =
- that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name
+ that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name
}
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
new file mode 100644
index 00000000..5b0bdb38
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala
@@ -0,0 +1,283 @@
+package kamon
+package context
+
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success}
+
+/**
+ * Context Propagation for HTTP transports. When using HTTP transports all the context related information is
+ * read from and written to HTTP headers. The context information may be included in the following directions:
+ * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read.
+ * - Outgoing: Used for HTTP requests leaving this service.
+ * - Returning: Used for HTTP responses send back to clients of this service.
+ */
+trait HttpPropagation {
+
+ /**
+ * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a
+ * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is
+ * implementation specific.
+ *
+ * @param reader Wrapper on the HTTP message from which headers are read.
+ * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an
+ * empty context is returned instead.
+ */
+ def read(reader: HttpPropagation.HeaderReader): Context
+
+ /**
+ * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]]
+ * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation
+ * specific.
+ *
+ * Implementations are expected to produce side effects on the wrapped HTTP Messages.
+ *
+ * @param context Context instance to be written.
+ * @param writer Wrapper on the HTTP message that will carry the context headers.
+ * @param direction Write direction. It can be either Outgoing or Returning.
+ */
+ def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit
+
+}
+
+object HttpPropagation {
+
+ /**
+ * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait
+ * must be aware of the entry they are able to read and the HTTP headers required to do so.
+ */
+ trait EntryReader {
+
+ /**
+ * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations
+ * must return an updated context instance that includes such entry. If no entry could be read simply return
+ * context instance that was passed in, untouched.
+ *
+ * @param reader Wrapper on the HTTP message from which headers are read.
+ * @param context Current context.
+ * @return Either the original context passed in or a modified version of it, including the read entry.
+ */
+ def read(reader: HttpPropagation.HeaderReader, context: Context): Context
+ }
+
+ /**
+ * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait
+ * must be aware of the entry they are able to write and the HTTP headers required to do so.
+ */
+ trait EntryWriter {
+
+ /**
+ * Tries to write a context entry into HTTP headers.
+ *
+ * @param context The context from which entries should be written.
+ * @param writer Wrapper on the HTTP message that will carry the context headers.
+ * @param direction Write direction. It can be either Outgoing or Returning.
+ */
+ def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit
+ }
+
+
+ /**
+ * Wrapper that reads HTTP headers from HTTP a message.
+ */
+ trait HeaderReader {
+
+ /**
+ * Reads an HTTP header value
+ *
+ * @param header HTTP header name
+ * @return The HTTP header value, if present.
+ */
+ def read(header: String): Option[String]
+ }
+
+ /**
+ * Wrapper that writes HTTP headers to a HTTP message.
+ */
+ trait HeaderWriter {
+
+ /**
+ * Writes a HTTP header into a HTTP message.
+ *
+ * @param header HTTP header name.
+ * @param value HTTP header value.
+ */
+ def write(header: String, value: String): Unit
+ }
+
+
+ /**
+ * Create a new default HttpPropagation instance from the provided configuration.
+ *
+ * @param config HTTP propagation channel configuration
+ * @return A newly constructed HttpPropagation instance.
+ */
+ def from(config: Config, classLoading: ClassLoading): HttpPropagation = {
+ new HttpPropagation.Default(Components.from(config, classLoading))
+ }
+
+ /**
+ * Default HTTP Propagation in Kamon.
+ */
+ final class Default(components: Components) extends HttpPropagation {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default])
+
+ /**
+ * Reads context tags and entries on the following order:
+ * - Read all context tags from the context tags header.
+ * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case
+ * of a tag key clash.
+ * - Read all context entries using the incoming entries configuration.
+ */
+ override def read(reader: HeaderReader): Context = {
+ val tags = Map.newBuilder[String, String]
+
+ // Tags encoded together in the context tags header.
+ try {
+ reader.read(components.tagsHeaderName).foreach { contextTagsHeader =>
+ contextTagsHeader.split(";").foreach(tagData => {
+ val tagPair = tagData.split("=")
+ if (tagPair.length == 2) {
+ tags += (tagPair(0) -> tagPair(1))
+ }
+ })
+ }
+ } catch {
+ case t: Throwable => log.warn("Failed to read the context tags header", t.asInstanceOf[Any])
+ }
+
+ // Tags explicitly mapped on the tags.mappings configuration.
+ components.tagsMappings.foreach {
+ case (tagName, httpHeader) =>
+ try {
+ reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue))
+ } catch {
+ case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any])
+ }
+ }
+
+ // Incoming Entries
+ components.incomingEntries.foldLeft(Context.of(tags.result())) {
+ case (context, (entryName, entryDecoder)) =>
+ var result = context
+ try {
+ result = entryDecoder.read(reader, context)
+ } catch {
+ case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+
+ result
+ }
+ }
+
+ /**
+ * Writes context tags and entries
+ */
+ override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = {
+ val keys = direction match {
+ case Direction.Outgoing => components.outgoingEntries
+ case Direction.Returning => components.returningEntries
+ }
+
+ val contextTagsHeader = new StringBuilder()
+ def appendTag(key: String, value: String): Unit = {
+ contextTagsHeader
+ .append(key)
+ .append('=')
+ .append(value)
+ .append(';')
+ }
+
+ // Write tags with specific mappings or append them to the context tags header.
+ context.tags.foreach {
+ case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match {
+ case Some(mappedHeader) => writer.write(mappedHeader, tagValue)
+ case None => appendTag(tagKey, tagValue)
+ }
+ }
+
+ // Write the context tags header.
+ if(contextTagsHeader.nonEmpty) {
+ writer.write(components.tagsHeaderName, contextTagsHeader.result())
+ }
+
+ // Write entries for the specified direction.
+ keys.foreach {
+ case (entryName, entryWriter) =>
+ try {
+ entryWriter.write(context, writer, direction)
+ } catch {
+ case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any])
+ }
+ }
+ }
+ }
+
+ /**
+ * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to
+ * propagate context.
+ */
+ sealed trait Direction
+ object Direction {
+
+ /**
+ * Marker trait for all directions that require write operations.
+ */
+ sealed trait Write
+
+ /**
+ * Requests coming into this service.
+ */
+ case object Incoming extends Direction
+
+ /**
+ * Requests going from this service to others.
+ */
+ case object Outgoing extends Direction with Write
+
+ /**
+ * Responses sent from this service to clients.
+ */
+ case object Returning extends Direction with Write
+ }
+
+
+ case class Components(
+ tagsHeaderName: String,
+ tagsMappings: Map[String, String],
+ incomingEntries: Map[String, HttpPropagation.EntryReader],
+ outgoingEntries: Map[String, HttpPropagation.EntryWriter],
+ returningEntries: Map[String, HttpPropagation.EntryWriter]
+ )
+
+ object Components {
+ private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components])
+
+ def from(config: Config, classLoading: ClassLoading): Components = {
+ def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = {
+ val entryReaders = Map.newBuilder[String, ExpectedType]
+
+ mappings.foreach {
+ case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match {
+ case Success(readerInstance) => entryReaders += (contextKey -> readerInstance)
+ case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []",
+ implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception)
+ }
+ }
+
+ entryReaders.result()
+ }
+
+ val tagsHeaderName = config.getString("tags.header-name")
+ val tagsMappings = config.getConfig("tags.mappings").pairs
+ val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs)
+ val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs)
+ val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs)
+
+ Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries)
+ }
+ }
+
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala
deleted file mode 100644
index 3445cc31..00000000
--- a/kamon-core/src/main/scala/kamon/context/Mixin.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon.context
-
-import kamon.Kamon
-
-
-/**
- * Utility trait that marks objects carrying a reference to a Context instance.
- *
- */
-trait HasContext {
- def context: Context
-}
-
-object HasContext {
- private case class Default(context: Context) extends HasContext
-
- /**
- * Construct a HasSpan instance that references the provided Context.
- *
- */
- def from(context: Context): HasContext =
- Default(context)
-
- /**
- * Construct a HasContext instance with the current Kamon from Kamon's default context storage.
- *
- */
- def fromCurrentContext(): HasContext =
- Default(Kamon.currentContext())
-}
diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala
index d3b25500..d694206c 100644
--- a/kamon-core/src/main/scala/kamon/package.scala
+++ b/kamon-core/src/main/scala/kamon/package.scala
@@ -92,8 +92,14 @@ package object kamon {
def configurations: Map[String, Config] = {
topLevelKeys
- .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry))))
- .toMap
+ .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry))))
+ .toMap
+ }
+
+ def pairs: Map[String, String] = {
+ topLevelKeys
+ .map(key => (key, config.getString(key)))
+ .toMap
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 43391af7..6015e350 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -19,7 +19,7 @@ package trace
import java.time.Instant
import kamon.ReporterRegistry.SpanSink
-import kamon.context.Key
+import kamon.context.Context
import kamon.metric.MeasurementUnit
import kamon.trace.SpanContext.SamplingDecision
import kamon.util.Clock
@@ -66,7 +66,7 @@ sealed abstract class Span {
object Span {
- val ContextKey = Key.broadcast[Span]("span", Span.Empty)
+ val ContextKey = Context.key[Span]("span", Span.Empty)
object Empty extends Span {
override val context: SpanContext = SpanContext.EmptySpanContext
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
index 14b28d54..7d707c9f 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -19,54 +19,35 @@ import java.net.{URLDecoder, URLEncoder}
import java.nio.ByteBuffer
import kamon.Kamon
-import kamon.context.{Codecs, Context, TextMap}
+import kamon.context.{Codecs, Context, HttpPropagation, TextMap}
import kamon.context.generated.binary.span.{Span => ColferSpan}
+import kamon.context.HttpPropagation.Direction
import kamon.trace.SpanContext.SamplingDecision
object SpanCodec {
- class B3 extends Codecs.ForEntry[TextMap] {
+ class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter {
import B3.Headers
- override def encode(context: Context): TextMap = {
- val span = context.get(Span.ContextKey)
- val carrier = TextMap.Default()
-
- if(span.nonEmpty()) {
- val spanContext = span.context()
- carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
- carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
-
- if(spanContext.parentID != IdentityProvider.NoIdentifier)
- carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
-
- encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
- carrier.put(Headers.Sampled, samplingDecision)
- }
- }
-
- carrier
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
val identityProvider = Kamon.tracer.identityProvider
- val traceID = carrier.get(Headers.TraceIdentifier)
+ val traceID = reader.read(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val spanID = carrier.get(Headers.SpanIdentifier)
+ val spanID = reader.read(Headers.SpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
- val parentID = carrier.get(Headers.ParentSpanIdentifier)
+ val parentID = reader.read(Headers.ParentSpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val flags = carrier.get(Headers.Flags)
+ val flags = reader.read(Headers.Flags)
- val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match {
+ val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match {
case Some(sampled) if sampled == "1" => SamplingDecision.Sample
case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
@@ -77,6 +58,24 @@ object SpanCodec {
} else context
}
+
+ override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = {
+ val span = context.get(Span.ContextKey)
+
+ if(span.nonEmpty()) {
+ val spanContext = span.context()
+ writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+
+ if(spanContext.parentID != IdentityProvider.NoIdentifier)
+ writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+
+ encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
+ writer.write(Headers.Sampled, samplingDecision)
+ }
+ }
+ }
+
private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match {
case SamplingDecision.Sample => Some("1")
case SamplingDecision.DoNotSample => Some("0")
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
index 2a8e2271..ed329ef7 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
@@ -15,7 +15,7 @@
package kamon.trace
-import kamon.context.Key
+import kamon.context.{Context}
import kamon.trace.Tracer.SpanBuilder
/**
@@ -39,7 +39,7 @@ object SpanCustomizer {
override def customize(spanBuilder: SpanBuilder): SpanBuilder = spanBuilder
}
- val ContextKey = Key.local[SpanCustomizer]("span-customizer", Noop)
+ val ContextKey = Context.key[SpanCustomizer]("span-customizer", Noop)
def forOperationName(operationName: String): SpanCustomizer = new SpanCustomizer {
override def customize(spanBuilder: SpanBuilder): SpanBuilder =