aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/context
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-08-14 17:30:16 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-08-14 17:30:16 +0200
commit3a8c0fa25f12230b27e943d1fffe07f814c650fe (patch)
tree75a12128af7387f40e3eba040812e1bd87b9a455 /kamon-core/src/main/scala/kamon/context
parenta6113cf33ba1b98cc73d35176ccf8a2f76b77875 (diff)
downloadKamon-3a8c0fa25f12230b27e943d1fffe07f814c650fe.tar.gz
Kamon-3a8c0fa25f12230b27e943d1fffe07f814c650fe.tar.bz2
Kamon-3a8c0fa25f12230b27e943d1fffe07f814c650fe.zip
implement Span propagation on top of Kamon.Context
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codec.scala132
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala127
-rw-r--r--kamon-core/src/main/scala/kamon/context/Mixin.scala43
-rw-r--r--kamon-core/src/main/scala/kamon/context/Storage.scala39
4 files changed, 229 insertions, 112 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala
new file mode 100644
index 00000000..957c3e26
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Codec.scala
@@ -0,0 +1,132 @@
+package kamon
+package context
+
+import com.typesafe.config.Config
+import kamon.util.DynamicAccess
+import org.slf4j.LoggerFactory
+import scala.collection.mutable
+
+class Codec(initialConfig: Config) {
+ private val log = LoggerFactory.getLogger(classOf[Codec])
+
+ @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
+ //val Binary: Codec.ForContext[ByteBuffer] = _
+ reconfigure(initialConfig)
+
+
+ def HttpHeaders: Codec.ForContext[TextMap] =
+ httpHeaders
+
+ def reconfigure(config: Config): Unit = {
+ httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
+
+
+ // Kamon.contextCodec.httpHeaderExport(current)
+ // Kamon.exportContext(HTTP, context)
+ // Kamon.importContext(HTTP, textMap)
+ // Kamon.currentContext()
+ // Kamon.storeContext(context)
+
+ }
+
+ private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
+ val rootConfig = config.getConfig(rootKey)
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val entries = Map.newBuilder[String, Codec.ForEntry[T]]
+
+ rootConfig.topLevelKeys.foreach(key => {
+ try {
+ val fqcn = rootConfig.getString(key)
+ entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
+ } catch {
+ case e: Throwable =>
+ log.error(s"Failed to initialize codec for key [$key]", e)
+ }
+ })
+
+ entries.result()
+ }
+}
+
+object Codec {
+
+ trait ForContext[T] {
+ def encode(context: Context): T
+ def decode(carrier: T): Context
+ }
+
+ trait ForEntry[T] {
+ def encode(context: Context): T
+ def decode(carrier: T, context: Context): Context
+ }
+
+ final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
+ private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
+
+ override def encode(context: Context): TextMap = {
+ val encoded = TextMap.Default()
+
+ context.entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(codec) =>
+ try {
+ codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+ } catch {
+ case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+ }
+
+ case None =>
+ log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+ }
+ }
+
+ encoded
+ }
+
+ override def decode(carrier: TextMap): Context = {
+ var context: Context = Context.Empty
+
+ try {
+ context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
+ val (_, codec) = codecEntry
+ codec.decode(carrier, ctx)
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from HttpHeaders", e)
+ }
+
+ context
+ }
+ }
+
+}
+
+
+trait TextMap {
+ def get(key: String): Option[String]
+
+ def put(key: String, value: String): Unit
+
+ def values: Iterator[(String, String)]
+}
+
+object TextMap {
+
+ class Default extends TextMap {
+ private val storage = mutable.Map.empty[String, String]
+
+ override def get(key: String): Option[String] = storage.get(key)
+
+ override def put(key: String, value: String): Unit = storage.put(key, value)
+
+ override def values: Iterator[(String, String)] = storage.toIterator
+ }
+
+ object Default {
+ def apply(): Default = new Default()
+ }
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
index f7c78388..f8a4662f 100644
--- a/kamon-core/src/main/scala/kamon/context/Context.scala
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -1,18 +1,27 @@
package kamon.context
-class Context private (private val keys: Map[Key[_], Any]) {
+class Context private (private[context] val entries: Map[Key[_], Any]) {
def get[T](key: Key[T]): T =
- keys.get(key).getOrElse(key.emptyValue).asInstanceOf[T]
+ entries.get(key).getOrElse(key.emptyValue).asInstanceOf[T]
def withKey[T](key: Key[T], value: T): Context =
- new Context(keys.updated(key, value))
+ new Context(entries.updated(key, value))
}
object Context {
val Empty = new Context(Map.empty)
- def apply(): Context = Empty
- def create(): Context = Empty
+ def apply(): Context =
+ Empty
+
+ def create(): Context =
+ Empty
+
+ def apply[T](key: Key[T], value: T): Context =
+ new Context(Map(key -> value))
+
+ def create[T](key: Key[T], value: T): Context =
+ apply(key, value)
}
@@ -38,110 +47,4 @@ object Key {
override def equals(that: Any): Boolean =
that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name
}
-}
-
-trait Storage {
- def current(): Context
- def store(context: Context): Scope
-
- trait Scope {
- def context: Context
- def close(): Unit
- }
-}
-
-object Storage {
-
- class ThreadLocal extends Storage {
- private val tls = new java.lang.ThreadLocal[Context]() {
- override def initialValue(): Context = Context.Empty
- }
-
- override def current(): Context =
- tls.get()
-
- override def store(context: Context): Scope = {
- val newContext = context
- val previousContext = tls.get()
- tls.set(newContext)
-
- new Scope {
- override def context: Context = newContext
- override def close(): Unit = tls.set(previousContext)
- }
- }
- }
-}
-
-trait KeyCodec[T] {
- def encode(context: Context): T
- def decode(carrier: T, context: Context): Context
-}
-
-/*
-object Example {
- // this is defined somewhere statically, only once.
- val User = Key.local[Option[User]]("user", None)
- val Client = Key.local[Option[User]]("client", null)
- val Span = Key.broadcast[Span]("span", EmptySpan)
- val storage = Kamon.contextStorage // or something similar.
-
- storage.get(Span) // returns a Span instance or EmptySpan.
- storage.get(User) // Returns Option[User] or None if not set.
- storage.get(Client) // Returns Option[Client] or null if not set.
-
- // Context Propagation works the very same way as before.
-
- val scope = storage.store(context)
- // do something here
- scope.close()
-
- // Configuration for codecs would be handled sort of like this:
-
- // kamon.context.propagation {
- // http-header-codecs {
- // "span" = kamon.trace.propagation.B3
- // }
- //
- // binary-codecs {
- // "span" = kamon.trace.propagation.Binary
- // }
- // }
-
-
-
-
-
-}*/
-
-
-
-/*
-
-
-
-
-
-class Context(private val keys: Map[Key[_], Any]) {
-
-
-
-
-
-}
-
-object Context {
-
-
-}
-
-sealed trait Key[T] {
- def name: String
-}
-
-object Key {
-
- def local[T](name: String): Key[T] = Local(name)
-
- case class Local[T](name: String) extends Key[T]
-}*/
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala
new file mode 100644
index 00000000..52c97e84
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Mixin.scala
@@ -0,0 +1,43 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.context
+
+
+/**
+ * Utility trait that marks objects carrying a reference to a Span.
+ *
+ */
+trait HasContext {
+ def context: Context
+}
+
+object HasContext {
+ private case class Default(context: Context) extends HasContext
+
+ /**
+ * Construct a HasSpan instance that references the provided Span.
+ *
+ */
+ def from(context: Context): HasContext =
+ Default(context)
+
+ /**
+ * Construct a HasSpan instance that references the currently ActiveSpan in Kamon's tracer.
+ *
+ */
+// def fromActiveSpan(): HasContext =
+// Default(Kamon.activeSpan())
+}
diff --git a/kamon-core/src/main/scala/kamon/context/Storage.scala b/kamon-core/src/main/scala/kamon/context/Storage.scala
new file mode 100644
index 00000000..6b92ff85
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Storage.scala
@@ -0,0 +1,39 @@
+package kamon.context
+
+trait Storage {
+ def current(): Context
+ def store(context: Context): Storage.Scope
+}
+
+object Storage {
+
+ trait Scope {
+ def context: Context
+ def close(): Unit
+ }
+
+
+ class ThreadLocal extends Storage {
+ private val tls = new java.lang.ThreadLocal[Context]() {
+ override def initialValue(): Context = Context.Empty
+ }
+
+ override def current(): Context =
+ tls.get()
+
+ override def store(context: Context): Scope = {
+ val newContext = context
+ val previousContext = tls.get()
+ tls.set(newContext)
+
+ new Scope {
+ override def context: Context = newContext
+ override def close(): Unit = tls.set(previousContext)
+ }
+ }
+ }
+
+ object ThreadLocal {
+ def apply(): ThreadLocal = new ThreadLocal()
+ }
+} \ No newline at end of file