diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2018-08-30 10:40:53 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2018-08-30 10:40:53 +0200 |
commit | e4abea098ef4d6e71a805812bfa95c14bd9002b5 (patch) | |
tree | f5fcb8222e293f420a9e7c06953805a7428d0f0e /kamon-core/src | |
parent | 794fbf02664ac8c31072d8b955d897901f1f22e0 (diff) | |
download | Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.gz Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.tar.bz2 Kamon-e4abea098ef4d6e71a805812bfa95c14bd9002b5.zip |
working on context tags and http propagation improvements
Diffstat (limited to 'kamon-core/src')
-rw-r--r-- | kamon-core/src/main/resources/reference.conf | 54 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/ClassLoading.scala | 26 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/Configuration.scala | 57 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/ContextPropagation.scala | 67 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/ContextStorage.scala | 47 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/Environment.scala | 5 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/Kamon.scala | 69 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Codecs.scala | 108 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Context.scala | 105 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/HttpPropagation.scala | 283 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Mixin.scala | 45 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/package.scala | 10 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Span.scala | 4 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanCodec.scala | 55 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala | 4 |
15 files changed, 685 insertions, 254 deletions
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 60fa156d..5e5078ca 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -194,6 +194,60 @@ kamon { } + propagation { + channels { + http { + type = http + + tags { + + # Header name used to encode context tags. + header-name = "context-tags" + + + # Provide explicit mappins between context tags and the HTTP headers that will carry them. When there is + # an explicit mapping for a tag, it will not be included in the default context header. For example, if + # you wanted to use the an HTTP header called `X-Correlation-ID` for a context tag with key `correlationID` + # you would need to include a the following configuration: + # + # mappings { + # correlationID = "X-Correlation-ID" + # } + # + # The correlationID tag would always be read and written from the `X-Correlation-ID` header. The context + # tag name is represented as the configuration key and the desired header name is represented by the + # cofiguration value. + # + mappings { + + } + } + + entries { + + # Specify mappings between Context keys and the Http.EntryReader implementation in charge of reading them + # from the incoming HTTP request into the Context. + incoming { + #span = "something" + } + + # Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them + # on the outgoing HTTP requests. + outgoing { + + } + + # Specify mappings betwen Context keys and the Http.EntryWriter implementation in charge of writing them + # on the outgoing HTTP response sent back to clients. + returning { + + } + } + } + } + } + + util { filters { diff --git a/kamon-core/src/main/scala/kamon/ClassLoading.scala b/kamon-core/src/main/scala/kamon/ClassLoading.scala new file mode 100644 index 00000000..5b097af1 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ClassLoading.scala @@ -0,0 +1,26 @@ +package kamon + +import kamon.util.DynamicAccess + +import scala.collection.immutable +import scala.reflect.ClassTag +import scala.util.Try + +/** + * Utilities for creating instances from fully qualified class names. + */ +trait ClassLoading { + @volatile private var _dynamicAccessClassLoader = this.getClass.getClassLoader + @volatile private var _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader) + + def classLoader(): ClassLoader = + _dynamicAccessClassLoader + + def changeClassLoader(classLoader: ClassLoader): Unit = synchronized { + _dynamicAccessClassLoader = classLoader + _dynamicAccess = new DynamicAccess(_dynamicAccessClassLoader) + } + + def createInstance[T: ClassTag](fqcn: String, args: immutable.Seq[(Class[_], AnyRef)]): Try[T] = + _dynamicAccess.createInstanceFor(fqcn, args) +} diff --git a/kamon-core/src/main/scala/kamon/Configuration.scala b/kamon-core/src/main/scala/kamon/Configuration.scala new file mode 100644 index 00000000..bd286792 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/Configuration.scala @@ -0,0 +1,57 @@ +package kamon + +import scala.util.Try +import com.typesafe.config.{Config, ConfigFactory} +import org.slf4j.LoggerFactory + +trait Configuration { self: ClassLoading => + private val logger = LoggerFactory.getLogger(classOf[Configuration]) + private var _currentConfig: Config = ConfigFactory.load(self.classLoader()) + private var _onReconfigureHooks = Seq.empty[Configuration.OnReconfigureHook] + + + /** + * Retrieve Kamon's current configuration. + */ + def config(): Config = + _currentConfig + + /** + * Supply a new Config instance to rule Kamon's world. + */ + def reconfigure(newConfig: Config): Unit = synchronized { + _currentConfig = newConfig + _onReconfigureHooks.foreach(hook => { + Try(hook.onReconfigure(newConfig)).failed.foreach(error => + logger.error("Exception occurred while trying to run a OnReconfigureHook", error) + ) + }) + } + + /** + * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All + * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config). + */ + def onReconfigure(hook: Configuration.OnReconfigureHook): Unit = synchronized { + _onReconfigureHooks = hook +: _onReconfigureHooks + } + + /** + * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All + * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config). + */ + def onReconfigure(hook: (Config) => Unit): Unit = { + onReconfigure(new Configuration.OnReconfigureHook { + override def onReconfigure(newConfig: Config): Unit = hook.apply(newConfig) + }) + } + +} + +object Configuration { + + trait OnReconfigureHook { + def onReconfigure(newConfig: Config): Unit + } + +} diff --git a/kamon-core/src/main/scala/kamon/ContextPropagation.scala b/kamon-core/src/main/scala/kamon/ContextPropagation.scala new file mode 100644 index 00000000..518aa021 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ContextPropagation.scala @@ -0,0 +1,67 @@ +package kamon + +import com.typesafe.config.Config +import kamon.context.HttpPropagation + +trait ContextPropagation { self: Configuration with ClassLoading => + @volatile private var _propagationComponents: ContextPropagation.Components = _ + @volatile private var _defaultHttpPropagation: HttpPropagation = _ + + // Initial configuration and reconfigures + init(self.config) + self.onReconfigure(newConfig => self.init(newConfig)) + + + /** + * Retrieves the HTTP propagation channel with the supplied name. Propagation channels are configured on the + * kamon.propagation.channels configuration setting. + * + * @param channelName Channel name to retrieve. + * @return The HTTP propagation, if defined. + */ + def httpPropagation(channelName: String): Option[HttpPropagation] = + _propagationComponents.httpChannels.get(channelName) + + /** + * Retrieves the default HTTP propagation channel. Configuration for this channel can be found under the + * kamon.propagation.channels.http configuration setting. + * + * @return The default HTTP propagation. + */ + def defaultHttpPropagation(): HttpPropagation = + _defaultHttpPropagation + + + + private def init(config: Config): Unit = synchronized { + _propagationComponents = ContextPropagation.Components.from(self.config, self) + _defaultHttpPropagation = _propagationComponents.httpChannels(ContextPropagation.DefaultHttpChannel) + } +} + +object ContextPropagation { + val DefaultHttpChannel = "http" + val DefaultBinaryChannel = "binary" + + case class Components( + httpChannels: Map[String, HttpPropagation] + ) + + object Components { + + def from(config: Config, classLoading: ClassLoading): Components = { + val propagationConfig = config.getConfig("kamon.propagation") + val channels = propagationConfig.getConfig("channels").configurations + + val httpChannels = Map.newBuilder[String, HttpPropagation] + + channels.foreach { + case (channelName, channelConfig) => channelConfig.getString("type") match { + case "http" => httpChannels += (channelName -> HttpPropagation.from(channelConfig, classLoading)) + } + } + + Components(httpChannels.result()) + } + } +} diff --git a/kamon-core/src/main/scala/kamon/ContextStorage.scala b/kamon-core/src/main/scala/kamon/ContextStorage.scala new file mode 100644 index 00000000..ee35264a --- /dev/null +++ b/kamon-core/src/main/scala/kamon/ContextStorage.scala @@ -0,0 +1,47 @@ +package kamon + +import kamon.context.{Context, Storage} +import kamon.trace.Span + +trait ContextStorage { + private val _contextStorage = Storage.ThreadLocal() + + def currentContext(): Context = + _contextStorage.current() + + def currentSpan(): Span = + _contextStorage.current().get(Span.ContextKey) + + def storeContext(context: Context): Storage.Scope = + _contextStorage.store(context) + + def withContext[T](context: Context)(f: => T): T = { + val scope = _contextStorage.store(context) + try { + f + } finally { + scope.close() + } + } + + def withContextKey[T, K](key: Context.Key[K], value: K)(f: => T): T = + withContext(currentContext().withKey(key, value))(f) + + def withSpan[T](span: Span)(f: => T): T = + withSpan(span, true)(f) + + def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = { + try { + withContextKey(Span.ContextKey, span)(f) + } catch { + case t: Throwable => + span.addError(t.getMessage, t) + throw t + + } finally { + if(finishSpan) + span.finish() + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/Environment.scala b/kamon-core/src/main/scala/kamon/Environment.scala index 1c00679d..2e07d18d 100644 --- a/kamon-core/src/main/scala/kamon/Environment.scala +++ b/kamon-core/src/main/scala/kamon/Environment.scala @@ -21,6 +21,9 @@ import java.util.concurrent.ThreadLocalRandom import com.typesafe.config.Config import kamon.util.HexCodec + + + case class Environment(host: String, service: String, instance: String, incarnation: String, tags: Map[String, String]) object Environment { @@ -47,6 +50,4 @@ object Environment { private def readValueOrGenerate(configuredValue: String, generator: => String): String = if(configuredValue == "auto") generator else configuredValue - - } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 8f69ab41..2825d961 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -19,7 +19,7 @@ import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} import com.typesafe.config.{Config, ConfigFactory} -import kamon.context.{Codecs, Context, Key, Storage} +import kamon.context.Codecs import kamon.metric._ import kamon.trace._ import kamon.util.{Clock, Filters, Matcher, Registration} @@ -29,7 +29,7 @@ import scala.concurrent.Future import scala.util.Try -object Kamon extends MetricLookup with ReporterRegistry with Tracer { +object Kamon extends MetricLookup with ClassLoading with Configuration with ReporterRegistry with Tracer with ContextPropagation with ContextStorage { private val logger = LoggerFactory.getLogger("kamon.Kamon") @volatile private var _config = ConfigFactory.load() @@ -41,19 +41,15 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { private val _metrics = new MetricRegistry(_config, _scheduler) private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config, _clock) private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config, _clock) - private val _contextStorage = Storage.ThreadLocal() private val _contextCodec = new Codecs(_config) - private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] + //private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] sys.addShutdownHook(() => _scheduler.shutdown()) def environment: Environment = _environment - def config(): Config = - _config - - def reconfigure(config: Config): Unit = synchronized { + onReconfigure(newConfig => { _config = config _environment = Environment.fromConfig(config) _filters = Filters.fromConfig(config) @@ -62,18 +58,11 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { _tracer.reconfigure(config) _contextCodec.reconfigure(config) - _onReconfigureHooks.foreach(hook => { - Try(hook.onReconfigure(config)).failed.foreach(error => - logger.error("Exception occurred while trying to run a OnReconfigureHook", error) - ) - }) - _scheduler match { case stpe: ScheduledThreadPoolExecutor => stpe.setCorePoolSize(schedulerPoolSize(config)) case other => logger.error("Unexpected scheduler [{}] found when reconfiguring Kamon.", other) } - } - + }) override def histogram(name: String, unit: MeasurementUnit, dynamicRange: Option[DynamicRange]): HistogramMetric = _metrics.histogram(name, unit, dynamicRange) @@ -105,43 +94,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { def contextCodec(): Codecs = _contextCodec - def currentContext(): Context = - _contextStorage.current() - - def currentSpan(): Span = - _contextStorage.current().get(Span.ContextKey) - - def storeContext(context: Context): Storage.Scope = - _contextStorage.store(context) - - def withContext[T](context: Context)(f: => T): T = { - val scope = _contextStorage.store(context) - try { - f - } finally { - scope.close() - } - } - - def withContextKey[T, K](key: Key[K], value: K)(f: => T): T = - withContext(currentContext().withKey(key, value))(f) - - def withSpan[T](span: Span)(f: => T): T = - withSpan(span, true)(f) - def withSpan[T](span: Span, finishSpan: Boolean)(f: => T): T = { - try { - withContextKey(Span.ContextKey, span)(f) - } catch { - case t: Throwable => - span.addError(t.getMessage, t) - throw t - - } finally { - if(finishSpan) - span.finish() - } - } override def loadReportersFromConfig(): Unit = _reporterRegistry.loadReportersFromConfig() @@ -173,14 +126,6 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { def clock(): Clock = _clock - /** - * Register a reconfigure hook that will be run when the a call to Kamon.reconfigure(config) is performed. All - * registered hooks will run sequentially in the same Thread that calls Kamon.reconfigure(config). - */ - def onReconfigure(hook: OnReconfigureHook): Unit = synchronized { - _onReconfigureHooks = hook +: _onReconfigureHooks - } - def scheduler(): ScheduledExecutorService = _scheduler @@ -188,7 +133,3 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { config.getInt("kamon.scheduler-pool-size") } - -trait OnReconfigureHook { - def onReconfigure(newConfig: Config): Unit -} diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala index c5d237a9..465f53be 100644 --- a/kamon-core/src/main/scala/kamon/context/Codecs.scala +++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala @@ -100,22 +100,22 @@ object Codecs { override def encode(context: Context): TextMap = { val encoded = TextMap.Default() - context.entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(codec) => - try { - codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) - } catch { - case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) - } - - case None => - log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } +// context.entries.foreach { +// case (key, _) if key.broadcast => +// entryCodecs.get(key.name) match { +// case Some(codec) => +// try { +// codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) +// } catch { +// case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) +// } +// +// case None => +// log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) +// } +// +// case _ => // All non-broadcast keys should be ignored. +// } encoded } @@ -150,29 +150,29 @@ object Codecs { emptyBuffer else { var colferEntries: List[ColferEntry] = Nil - entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(entryCodec) => - try { - val entryData = entryCodec.encode(context) - if(entryData.capacity() > 0) { - val colferEntry = new ColferEntry() - colferEntry.setName(key.name) - colferEntry.setContent(entryData.array()) - colferEntries = colferEntry :: colferEntries - } - } catch { - case throwable: Throwable => - log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) - } - - case None => - log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) - } - - case _ => // All non-broadcast keys should be ignored. - } +// entries.foreach { +// case (key, _) if key.broadcast => +// entryCodecs.get(key.name) match { +// case Some(entryCodec) => +// try { +// val entryData = entryCodec.encode(context) +// if(entryData.capacity() > 0) { +// val colferEntry = new ColferEntry() +// colferEntry.setName(key.name) +// colferEntry.setContent(entryData.array()) +// colferEntries = colferEntry :: colferEntries +// } +// } catch { +// case throwable: Throwable => +// log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) +// } +// +// case None => +// log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) +// } +// +// case _ => // All non-broadcast keys should be ignored. +// } if(colferEntries.isEmpty) emptyBuffer @@ -226,38 +226,40 @@ object Codecs { } private class StringHeadersCodec(key: String, headerName: String) extends Codecs.ForEntry[TextMap] { - private val contextKey = Key.broadcast[Option[String]](key, None) + //private val contextKey = Key.broadcast[Option[String]](key, None) override def encode(context: Context): TextMap = { val textMap = TextMap.Default() - context.get(contextKey).foreach { value => - textMap.put(headerName, value) - } +// context.get(contextKey).foreach { value => +// textMap.put(headerName, value) +// } textMap } override def decode(carrier: TextMap, context: Context): Context = { - carrier.get(headerName) match { - case value @ Some(_) => context.withKey(contextKey, value) - case None => context - } + ??? +// carrier.get(headerName) match { +// case value @ Some(_) => context.withKey(contextKey, value) +// case None => context +// } } } private class StringBinaryCodec(key: String) extends Codecs.ForEntry[ByteBuffer] { val emptyBuffer: ByteBuffer = ByteBuffer.allocate(0) - private val contextKey = Key.broadcast[Option[String]](key, None) + //private val contextKey = Key.broadcast[Option[String]](key, None) override def encode(context: Context): ByteBuffer = { - context.get(contextKey) match { - case Some(value) => ByteBuffer.wrap(value.getBytes) - case None => emptyBuffer - } +// context.get(contextKey) match { +// case Some(value) => ByteBuffer.wrap(value.getBytes) +// case None => emptyBuffer +// } + ??? } override def decode(carrier: ByteBuffer, context: Context): Context = { - context.withKey(contextKey, Some(new String(carrier.array()))) + ??? //context.withKey(contextKey, Some(new String(carrier.array()))) } } } diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index e0b084cb..1eed7e14 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -13,90 +13,83 @@ * ========================================================================================= */ -package kamon.context +package kamon +package context -import java.io._ -import java.nio.ByteBuffer +import java.util.{Map => JavaMap} +import scala.collection.JavaConverters._ -import kamon.Kamon +class Context private (private[context] val entries: Map[Context.Key[_], Any], private[context] val tags: Map[String, String]) { -class Context private (private[context] val entries: Map[Key[_], Any]) extends scala.Serializable { - def get[T](key: Key[T]): T = + def get[T](key: Context.Key[T]): T = entries.getOrElse(key, key.emptyValue).asInstanceOf[T] - def withKey[T](key: Key[T], value: T): Context = - new Context(entries.updated(key, value)) + def getTag(tagKey: String): Option[String] = + tags.get(tagKey) - var _deserializedEntries: Map[Key[_], Any] = Map.empty + def withKey[T](key: Context.Key[T], value: T): Context = + new Context(entries.updated(key, value), tags) - @throws[IOException] - private def writeObject(out: ObjectOutputStream): Unit = out.write( - Kamon.contextCodec().Binary.encode(this).array() - ) + def withTag(tagKey: String, tagValue: String): Context = + new Context(entries, tags.updated(tagKey, tagValue)) - @throws[IOException] - @throws[ClassNotFoundException] - private def readObject(in: ObjectInputStream): Unit = { - val buf = new Array[Byte](in.available()) - in.readFully(buf) - _deserializedEntries = Kamon.contextCodec().Binary.decode(ByteBuffer.wrap(buf)).entries - } - - def readResolve(): AnyRef = new Context(_deserializedEntries) + def withTags(tags: Map[String, String]): Context = + new Context(entries, this.tags ++ tags) - override def equals(obj: scala.Any): Boolean = { - obj != null && - obj.isInstanceOf[Context] && - obj.asInstanceOf[Context].entries != null && - obj.asInstanceOf[Context].entries == this.entries - } + def withTags(tags: JavaMap[String, String]): Context = + new Context(entries, this.tags ++ tags.asScala.toMap) - override def hashCode(): Int = entries.hashCode() } object Context { - val Empty = new Context(Map.empty) - - def apply(): Context = - Empty + val Empty = new Context(Map.empty, Map.empty) - def create(): Context = - Empty + def of(tags: JavaMap[String, String]): Context = + new Context(Map.empty, tags.asScala.toMap) - def apply[T](key: Key[T], value: T): Context = - new Context(Map(key -> value)) + def of(tags: Map[String, String]): Context = + new Context(Map.empty, tags) - def create[T](key: Key[T], value: T): Context = - apply(key, value) + def of[T](key: Context.Key[T], value: T): Context = + new Context(Map(key -> value), Map.empty) -} - - -sealed abstract class Key[T] { - def name: String - def emptyValue: T - def broadcast: Boolean -} + def of[T](key: Context.Key[T], value: T, tags: JavaMap[String, String]): Context = + new Context(Map(key -> value), tags.asScala.toMap) -object Key { + def of[T](key: Context.Key[T], value: T, tags: Map[String, String]): Context = + new Context(Map(key -> value), tags) - def local[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, false) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U): Context = + new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), Map.empty) - def broadcast[T](name: String, emptyValue: T): Key[T] = - new Default[T](name, emptyValue, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: JavaMap[String, String]): Context = + new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags.asScala.toMap) - def broadcastString(name: String): Key[Option[String]] = - new Default[Option[String]](name, None, true) + def of[T, U](keyOne: Context.Key[T], valueOne: T, keyTwo: Context.Key[U], valueTwo: U, tags: Map[String, String]): Context = + new Context(Map(keyOne -> valueOne, keyTwo -> valueTwo), tags) + def key[T](name: String, emptyValue: T): Context.Key[T] = + new Context.Key(name, emptyValue) - private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] { + /** + * Encapsulates the type, name and empty value for a context entry. All reads and writes from a context instance + * must be done using a context key, which will ensure the right type is used on both operations. The key's name + * is used when configuring mappings and incoming/outgoing/returning codecs for context propagation across channels. + * + * If you try to read an entry from a context and such entry is not present, the empty value for the key is returned + * instead. + * + * @param name Key name. Must be unique. + * @param emptyValue Value to be returned when reading from a context that doesn't have an entry with this key. + * @tparam ValueType Type of the value to be held on the context with this key. + */ + final class Key[ValueType](val name: String, val emptyValue: ValueType) { override def hashCode(): Int = name.hashCode override def equals(that: Any): Boolean = - that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name + that.isInstanceOf[Context.Key[_]] && that.asInstanceOf[Context.Key[_]].name == this.name } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala new file mode 100644 index 00000000..5b0bdb38 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/HttpPropagation.scala @@ -0,0 +1,283 @@ +package kamon +package context + +import com.typesafe.config.Config +import org.slf4j.LoggerFactory + +import scala.reflect.ClassTag +import scala.util.{Failure, Success} + +/** + * Context Propagation for HTTP transports. When using HTTP transports all the context related information is + * read from and written to HTTP headers. The context information may be included in the following directions: + * - Incoming: Used for HTTP requests coming into this service. Implicitly used when using HttpPropagation.read. + * - Outgoing: Used for HTTP requests leaving this service. + * - Returning: Used for HTTP responses send back to clients of this service. + */ +trait HttpPropagation { + + /** + * Uses the provided [[HttpPropagation.HeaderReader]] to read as many HTTP Headers as necessary and create a + * [[Context]] instance. The way in which context tags and entries are read from and written to HTTP Headers is + * implementation specific. + * + * @param reader Wrapper on the HTTP message from which headers are read. + * @return The decoded Context instance. If no entries or tags could be read from the HTTP message then an + * empty context is returned instead. + */ + def read(reader: HttpPropagation.HeaderReader): Context + + /** + * Writes the tags and entries from the supplied context using the supplied [[HttpPropagation.HeaderWriter]] + * instance. The way in which context tags and entries are read from and written to HTTP Headers is implementation + * specific. + * + * Implementations are expected to produce side effects on the wrapped HTTP Messages. + * + * @param context Context instance to be written. + * @param writer Wrapper on the HTTP message that will carry the context headers. + * @param direction Write direction. It can be either Outgoing or Returning. + */ + def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: HttpPropagation.Direction.Write): Unit + +} + +object HttpPropagation { + + /** + * Encapsulates logic required to read a single context entry from HTTP headers. Implementations of this trait + * must be aware of the entry they are able to read and the HTTP headers required to do so. + */ + trait EntryReader { + + /** + * Tries to read a context entry from HTTP headers. If a context entry is successfully read, implementations + * must return an updated context instance that includes such entry. If no entry could be read simply return + * context instance that was passed in, untouched. + * + * @param reader Wrapper on the HTTP message from which headers are read. + * @param context Current context. + * @return Either the original context passed in or a modified version of it, including the read entry. + */ + def read(reader: HttpPropagation.HeaderReader, context: Context): Context + } + + /** + * Encapsulates logic required to write a single context entry to HTTP headers. Implementations of this trait + * must be aware of the entry they are able to write and the HTTP headers required to do so. + */ + trait EntryWriter { + + /** + * Tries to write a context entry into HTTP headers. + * + * @param context The context from which entries should be written. + * @param writer Wrapper on the HTTP message that will carry the context headers. + * @param direction Write direction. It can be either Outgoing or Returning. + */ + def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit + } + + + /** + * Wrapper that reads HTTP headers from HTTP a message. + */ + trait HeaderReader { + + /** + * Reads an HTTP header value + * + * @param header HTTP header name + * @return The HTTP header value, if present. + */ + def read(header: String): Option[String] + } + + /** + * Wrapper that writes HTTP headers to a HTTP message. + */ + trait HeaderWriter { + + /** + * Writes a HTTP header into a HTTP message. + * + * @param header HTTP header name. + * @param value HTTP header value. + */ + def write(header: String, value: String): Unit + } + + + /** + * Create a new default HttpPropagation instance from the provided configuration. + * + * @param config HTTP propagation channel configuration + * @return A newly constructed HttpPropagation instance. + */ + def from(config: Config, classLoading: ClassLoading): HttpPropagation = { + new HttpPropagation.Default(Components.from(config, classLoading)) + } + + /** + * Default HTTP Propagation in Kamon. + */ + final class Default(components: Components) extends HttpPropagation { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Default]) + + /** + * Reads context tags and entries on the following order: + * - Read all context tags from the context tags header. + * - Read all context tags with explicit mappings. This overrides any tag from the previous step in case + * of a tag key clash. + * - Read all context entries using the incoming entries configuration. + */ + override def read(reader: HeaderReader): Context = { + val tags = Map.newBuilder[String, String] + + // Tags encoded together in the context tags header. + try { + reader.read(components.tagsHeaderName).foreach { contextTagsHeader => + contextTagsHeader.split(";").foreach(tagData => { + val tagPair = tagData.split("=") + if (tagPair.length == 2) { + tags += (tagPair(0) -> tagPair(1)) + } + }) + } + } catch { + case t: Throwable => log.warn("Failed to read the context tags header", t.asInstanceOf[Any]) + } + + // Tags explicitly mapped on the tags.mappings configuration. + components.tagsMappings.foreach { + case (tagName, httpHeader) => + try { + reader.read(httpHeader).foreach(tagValue => tags += (tagName -> tagValue)) + } catch { + case t: Throwable => log.warn("Failed to read mapped tag [{}]", tagName, t.asInstanceOf[Any]) + } + } + + // Incoming Entries + components.incomingEntries.foldLeft(Context.of(tags.result())) { + case (context, (entryName, entryDecoder)) => + var result = context + try { + result = entryDecoder.read(reader, context) + } catch { + case t: Throwable => log.warn("Failed to read entry [{}]", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + + result + } + } + + /** + * Writes context tags and entries + */ + override def write(context: Context, writer: HeaderWriter, direction: Direction.Write): Unit = { + val keys = direction match { + case Direction.Outgoing => components.outgoingEntries + case Direction.Returning => components.returningEntries + } + + val contextTagsHeader = new StringBuilder() + def appendTag(key: String, value: String): Unit = { + contextTagsHeader + .append(key) + .append('=') + .append(value) + .append(';') + } + + // Write tags with specific mappings or append them to the context tags header. + context.tags.foreach { + case (tagKey, tagValue) => components.tagsMappings.get(tagKey) match { + case Some(mappedHeader) => writer.write(mappedHeader, tagValue) + case None => appendTag(tagKey, tagValue) + } + } + + // Write the context tags header. + if(contextTagsHeader.nonEmpty) { + writer.write(components.tagsHeaderName, contextTagsHeader.result()) + } + + // Write entries for the specified direction. + keys.foreach { + case (entryName, entryWriter) => + try { + entryWriter.write(context, writer, direction) + } catch { + case t: Throwable => log.warn("Failed to write entry [{}] due to: {}", entryName.asInstanceOf[Any], t.asInstanceOf[Any]) + } + } + } + } + + /** + * Propagation direction. Used to decide whether incoming, outgoing or returning keys must be used to + * propagate context. + */ + sealed trait Direction + object Direction { + + /** + * Marker trait for all directions that require write operations. + */ + sealed trait Write + + /** + * Requests coming into this service. + */ + case object Incoming extends Direction + + /** + * Requests going from this service to others. + */ + case object Outgoing extends Direction with Write + + /** + * Responses sent from this service to clients. + */ + case object Returning extends Direction with Write + } + + + case class Components( + tagsHeaderName: String, + tagsMappings: Map[String, String], + incomingEntries: Map[String, HttpPropagation.EntryReader], + outgoingEntries: Map[String, HttpPropagation.EntryWriter], + returningEntries: Map[String, HttpPropagation.EntryWriter] + ) + + object Components { + private val log = LoggerFactory.getLogger(classOf[HttpPropagation.Components]) + + def from(config: Config, classLoading: ClassLoading): Components = { + def buildInstances[ExpectedType : ClassTag](mappings: Map[String, String]): Map[String, ExpectedType] = { + val entryReaders = Map.newBuilder[String, ExpectedType] + + mappings.foreach { + case (contextKey, readerClass) => classLoading.createInstance[ExpectedType](readerClass, Nil) match { + case Success(readerInstance) => entryReaders += (contextKey -> readerInstance) + case Failure(exception) => log.warn("Failed to instantiate {} [{}] due to []", + implicitly[ClassTag[ExpectedType]].runtimeClass.getName, readerClass, exception) + } + } + + entryReaders.result() + } + + val tagsHeaderName = config.getString("tags.header-name") + val tagsMappings = config.getConfig("tags.mappings").pairs + val incomingEntries = buildInstances[HttpPropagation.EntryReader](config.getConfig("entries.incoming").pairs) + val outgoingEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.outgoing").pairs) + val returningEntries = buildInstances[HttpPropagation.EntryWriter](config.getConfig("entries.returning").pairs) + + Components(tagsHeaderName, tagsMappings, incomingEntries, outgoingEntries, returningEntries) + } + } + +} diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala deleted file mode 100644 index 3445cc31..00000000 --- a/kamon-core/src/main/scala/kamon/context/Mixin.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon.context - -import kamon.Kamon - - -/** - * Utility trait that marks objects carrying a reference to a Context instance. - * - */ -trait HasContext { - def context: Context -} - -object HasContext { - private case class Default(context: Context) extends HasContext - - /** - * Construct a HasSpan instance that references the provided Context. - * - */ - def from(context: Context): HasContext = - Default(context) - - /** - * Construct a HasContext instance with the current Kamon from Kamon's default context storage. - * - */ - def fromCurrentContext(): HasContext = - Default(Kamon.currentContext()) -} diff --git a/kamon-core/src/main/scala/kamon/package.scala b/kamon-core/src/main/scala/kamon/package.scala index d3b25500..d694206c 100644 --- a/kamon-core/src/main/scala/kamon/package.scala +++ b/kamon-core/src/main/scala/kamon/package.scala @@ -92,8 +92,14 @@ package object kamon { def configurations: Map[String, Config] = { topLevelKeys - .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry)))) - .toMap + .map(entry => (entry, config.getConfig(ConfigUtil.joinPath(entry)))) + .toMap + } + + def pairs: Map[String, String] = { + topLevelKeys + .map(key => (key, config.getString(key))) + .toMap } } } diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 43391af7..6015e350 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -19,7 +19,7 @@ package trace import java.time.Instant import kamon.ReporterRegistry.SpanSink -import kamon.context.Key +import kamon.context.Context import kamon.metric.MeasurementUnit import kamon.trace.SpanContext.SamplingDecision import kamon.util.Clock @@ -66,7 +66,7 @@ sealed abstract class Span { object Span { - val ContextKey = Key.broadcast[Span]("span", Span.Empty) + val ContextKey = Context.key[Span]("span", Span.Empty) object Empty extends Span { override val context: SpanContext = SpanContext.EmptySpanContext diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 14b28d54..7d707c9f 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -19,54 +19,35 @@ import java.net.{URLDecoder, URLEncoder} import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codecs, Context, TextMap} +import kamon.context.{Codecs, Context, HttpPropagation, TextMap} import kamon.context.generated.binary.span.{Span => ColferSpan} +import kamon.context.HttpPropagation.Direction import kamon.trace.SpanContext.SamplingDecision object SpanCodec { - class B3 extends Codecs.ForEntry[TextMap] { + class B3 extends HttpPropagation.EntryReader with HttpPropagation.EntryWriter { import B3.Headers - override def encode(context: Context): TextMap = { - val span = context.get(Span.ContextKey) - val carrier = TextMap.Default() - - if(span.nonEmpty()) { - val spanContext = span.context() - carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) - carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) - - if(spanContext.parentID != IdentityProvider.NoIdentifier) - carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) - - encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => - carrier.put(Headers.Sampled, samplingDecision) - } - } - - carrier - } - - override def decode(carrier: TextMap, context: Context): Context = { + override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = { val identityProvider = Kamon.tracer.identityProvider - val traceID = carrier.get(Headers.TraceIdentifier) + val traceID = reader.read(Headers.TraceIdentifier) .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val spanID = carrier.get(Headers.SpanIdentifier) + val spanID = reader.read(Headers.SpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { - val parentID = carrier.get(Headers.ParentSpanIdentifier) + val parentID = reader.read(Headers.ParentSpanIdentifier) .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val flags = carrier.get(Headers.Flags) + val flags = reader.read(Headers.Flags) - val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { + val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown @@ -77,6 +58,24 @@ object SpanCodec { } else context } + + override def write(context: Context, writer: HttpPropagation.HeaderWriter, direction: Direction.Write): Unit = { + val span = context.get(Span.ContextKey) + + if(span.nonEmpty()) { + val spanContext = span.context() + writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + + if(spanContext.parentID != IdentityProvider.NoIdentifier) + writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + + encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => + writer.write(Headers.Sampled, samplingDecision) + } + } + } + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { case SamplingDecision.Sample => Some("1") case SamplingDecision.DoNotSample => Some("0") diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala index 2a8e2271..ed329ef7 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala @@ -15,7 +15,7 @@ package kamon.trace -import kamon.context.Key +import kamon.context.{Context} import kamon.trace.Tracer.SpanBuilder /** @@ -39,7 +39,7 @@ object SpanCustomizer { override def customize(spanBuilder: SpanBuilder): SpanBuilder = spanBuilder } - val ContextKey = Key.local[SpanCustomizer]("span-customizer", Noop) + val ContextKey = Context.key[SpanCustomizer]("span-customizer", Noop) def forOperationName(operationName: String): SpanCustomizer = new SpanCustomizer { override def customize(spanBuilder: SpanBuilder): SpanBuilder = |