aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codec.scala130
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala50
-rw-r--r--kamon-core/src/main/scala/kamon/context/Mixin.scala45
-rw-r--r--kamon-core/src/main/scala/kamon/context/Storage.scala39
4 files changed, 264 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala
new file mode 100644
index 00000000..50b7e93d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Codec.scala
@@ -0,0 +1,130 @@
+package kamon
+package context
+
+import com.typesafe.config.Config
+import kamon.trace.IdentityProvider
+import kamon.util.DynamicAccess
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable
+
+class Codec(identityProvider: IdentityProvider, initialConfig: Config) {
+ private val log = LoggerFactory.getLogger(classOf[Codec])
+
+ @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
+ //val Binary: Codec.ForContext[ByteBuffer] = _
+ reconfigure(initialConfig)
+
+
+ def HttpHeaders: Codec.ForContext[TextMap] =
+ httpHeaders
+
+ def reconfigure(config: Config): Unit = {
+ httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
+ }
+
+ private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
+ val rootConfig = config.getConfig(rootKey)
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val entries = Map.newBuilder[String, Codec.ForEntry[T]]
+
+ rootConfig.topLevelKeys.foreach(key => {
+ try {
+ val fqcn = rootConfig.getString(key)
+ entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
+ } catch {
+ case e: Throwable =>
+ log.error(s"Failed to initialize codec for key [$key]", e)
+ }
+ })
+
+ entries.result()
+ }
+}
+
+object Codec {
+
+ trait ForContext[T] {
+ def encode(context: Context): T
+ def decode(carrier: T): Context
+ }
+
+ trait ForEntry[T] {
+ def encode(context: Context): T
+ def decode(carrier: T, context: Context): Context
+ }
+
+ final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
+ private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
+
+ override def encode(context: Context): TextMap = {
+ val encoded = TextMap.Default()
+
+ context.entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(codec) =>
+ try {
+ codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+ } catch {
+ case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+ }
+
+ case None =>
+ log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+ }
+ }
+
+ encoded
+ }
+
+ override def decode(carrier: TextMap): Context = {
+ var context: Context = Context.Empty
+
+ try {
+ context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
+ val (_, codec) = codecEntry
+ codec.decode(carrier, ctx)
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from HttpHeaders", e)
+ }
+
+ context
+ }
+ }
+}
+
+
+trait TextMap {
+
+ def get(key: String): Option[String]
+
+ def put(key: String, value: String): Unit
+
+ def values: Iterator[(String, String)]
+}
+
+object TextMap {
+
+ class Default extends TextMap {
+ private val storage =
+ mutable.Map.empty[String, String]
+
+ override def get(key: String): Option[String] =
+ storage.get(key)
+
+ override def put(key: String, value: String): Unit =
+ storage.put(key, value)
+
+ override def values: Iterator[(String, String)] =
+ storage.toIterator
+ }
+
+ object Default {
+ def apply(): Default =
+ new Default()
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
new file mode 100644
index 00000000..f8a4662f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -0,0 +1,50 @@
+package kamon.context
+
+class Context private (private[context] val entries: Map[Key[_], Any]) {
+ def get[T](key: Key[T]): T =
+ entries.get(key).getOrElse(key.emptyValue).asInstanceOf[T]
+
+ def withKey[T](key: Key[T], value: T): Context =
+ new Context(entries.updated(key, value))
+}
+
+object Context {
+ val Empty = new Context(Map.empty)
+
+ def apply(): Context =
+ Empty
+
+ def create(): Context =
+ Empty
+
+ def apply[T](key: Key[T], value: T): Context =
+ new Context(Map(key -> value))
+
+ def create[T](key: Key[T], value: T): Context =
+ apply(key, value)
+}
+
+
+trait Key[T] {
+ def name: String
+ def emptyValue: T
+ def broadcast: Boolean
+}
+
+object Key {
+
+ def local[T](name: String, emptyValue: T): Key[T] =
+ new Default[T](name, emptyValue, false)
+
+ def broadcast[T](name: String, emptyValue: T): Key[T] =
+ new Default[T](name, emptyValue, true)
+
+
+ private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] {
+ override def hashCode(): Int =
+ name.hashCode
+
+ override def equals(that: Any): Boolean =
+ that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala
new file mode 100644
index 00000000..64e03748
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Mixin.scala
@@ -0,0 +1,45 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.context
+
+import kamon.Kamon
+
+
+/**
+ * Utility trait that marks objects carrying a reference to a Span.
+ *
+ */
+trait HasContext {
+ def context: Context
+}
+
+object HasContext {
+ private case class Default(context: Context) extends HasContext
+
+ /**
+ * Construct a HasSpan instance that references the provided Span.
+ *
+ */
+ def from(context: Context): HasContext =
+ Default(context)
+
+ /**
+ * Construct a HasSpan instance that references the currently ActiveSpan in Kamon's tracer.
+ *
+ */
+ def fromCurrentContext(): HasContext =
+ Default(Kamon.currentContext())
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Storage.scala b/kamon-core/src/main/scala/kamon/context/Storage.scala
new file mode 100644
index 00000000..6b92ff85
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Storage.scala
@@ -0,0 +1,39 @@
+package kamon.context
+
+trait Storage {
+ def current(): Context
+ def store(context: Context): Storage.Scope
+}
+
+object Storage {
+
+ trait Scope {
+ def context: Context
+ def close(): Unit
+ }
+
+
+ class ThreadLocal extends Storage {
+ private val tls = new java.lang.ThreadLocal[Context]() {
+ override def initialValue(): Context = Context.Empty
+ }
+
+ override def current(): Context =
+ tls.get()
+
+ override def store(context: Context): Scope = {
+ val newContext = context
+ val previousContext = tls.get()
+ tls.set(newContext)
+
+ new Scope {
+ override def context: Context = newContext
+ override def close(): Unit = tls.set(previousContext)
+ }
+ }
+ }
+
+ object ThreadLocal {
+ def apply(): ThreadLocal = new ThreadLocal()
+ }
+} \ No newline at end of file