diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-14 17:30:16 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-14 17:30:16 +0200 |
commit | 3a8c0fa25f12230b27e943d1fffe07f814c650fe (patch) | |
tree | 75a12128af7387f40e3eba040812e1bd87b9a455 /kamon-core/src/main/scala/kamon/context | |
parent | a6113cf33ba1b98cc73d35176ccf8a2f76b77875 (diff) | |
download | Kamon-3a8c0fa25f12230b27e943d1fffe07f814c650fe.tar.gz Kamon-3a8c0fa25f12230b27e943d1fffe07f814c650fe.tar.bz2 Kamon-3a8c0fa25f12230b27e943d1fffe07f814c650fe.zip |
implement Span propagation on top of Kamon.Context
Diffstat (limited to 'kamon-core/src/main/scala/kamon/context')
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Codec.scala | 132 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Context.scala | 127 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Mixin.scala | 43 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/context/Storage.scala | 39 |
4 files changed, 229 insertions, 112 deletions
diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala new file mode 100644 index 00000000..957c3e26 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Codec.scala @@ -0,0 +1,132 @@ +package kamon +package context + +import com.typesafe.config.Config +import kamon.util.DynamicAccess +import org.slf4j.LoggerFactory +import scala.collection.mutable + +class Codec(initialConfig: Config) { + private val log = LoggerFactory.getLogger(classOf[Codec]) + + @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty) + //val Binary: Codec.ForContext[ByteBuffer] = _ + reconfigure(initialConfig) + + + def HttpHeaders: Codec.ForContext[TextMap] = + httpHeaders + + def reconfigure(config: Config): Unit = { + httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config)) + + + // Kamon.contextCodec.httpHeaderExport(current) + // Kamon.exportContext(HTTP, context) + // Kamon.importContext(HTTP, textMap) + // Kamon.currentContext() + // Kamon.storeContext(context) + + } + + private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = { + val rootConfig = config.getConfig(rootKey) + val dynamic = new DynamicAccess(getClass.getClassLoader) + val entries = Map.newBuilder[String, Codec.ForEntry[T]] + + rootConfig.topLevelKeys.foreach(key => { + try { + val fqcn = rootConfig.getString(key) + entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get)) + } catch { + case e: Throwable => + log.error(s"Failed to initialize codec for key [$key]", e) + } + }) + + entries.result() + } +} + +object Codec { + + trait ForContext[T] { + def encode(context: Context): T + def decode(carrier: T): Context + } + + trait ForEntry[T] { + def encode(context: Context): T + def decode(carrier: T, context: Context): Context + } + + final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] { + private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) + + override def encode(context: Context): TextMap = { + val encoded = TextMap.Default() + + context.entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(codec) => + try { + codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) + } catch { + case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) + } + + case None => + log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) + } + } + + encoded + } + + override def decode(carrier: TextMap): Context = { + var context: Context = Context.Empty + + try { + context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { + val (_, codec) = codecEntry + codec.decode(carrier, ctx) + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from HttpHeaders", e) + } + + context + } + } + +} + + +trait TextMap { + def get(key: String): Option[String] + + def put(key: String, value: String): Unit + + def values: Iterator[(String, String)] +} + +object TextMap { + + class Default extends TextMap { + private val storage = mutable.Map.empty[String, String] + + override def get(key: String): Option[String] = storage.get(key) + + override def put(key: String, value: String): Unit = storage.put(key, value) + + override def values: Iterator[(String, String)] = storage.toIterator + } + + object Default { + def apply(): Default = new Default() + } + +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala index f7c78388..f8a4662f 100644 --- a/kamon-core/src/main/scala/kamon/context/Context.scala +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -1,18 +1,27 @@ package kamon.context -class Context private (private val keys: Map[Key[_], Any]) { +class Context private (private[context] val entries: Map[Key[_], Any]) { def get[T](key: Key[T]): T = - keys.get(key).getOrElse(key.emptyValue).asInstanceOf[T] + entries.get(key).getOrElse(key.emptyValue).asInstanceOf[T] def withKey[T](key: Key[T], value: T): Context = - new Context(keys.updated(key, value)) + new Context(entries.updated(key, value)) } object Context { val Empty = new Context(Map.empty) - def apply(): Context = Empty - def create(): Context = Empty + def apply(): Context = + Empty + + def create(): Context = + Empty + + def apply[T](key: Key[T], value: T): Context = + new Context(Map(key -> value)) + + def create[T](key: Key[T], value: T): Context = + apply(key, value) } @@ -38,110 +47,4 @@ object Key { override def equals(that: Any): Boolean = that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name } -} - -trait Storage { - def current(): Context - def store(context: Context): Scope - - trait Scope { - def context: Context - def close(): Unit - } -} - -object Storage { - - class ThreadLocal extends Storage { - private val tls = new java.lang.ThreadLocal[Context]() { - override def initialValue(): Context = Context.Empty - } - - override def current(): Context = - tls.get() - - override def store(context: Context): Scope = { - val newContext = context - val previousContext = tls.get() - tls.set(newContext) - - new Scope { - override def context: Context = newContext - override def close(): Unit = tls.set(previousContext) - } - } - } -} - -trait KeyCodec[T] { - def encode(context: Context): T - def decode(carrier: T, context: Context): Context -} - -/* -object Example { - // this is defined somewhere statically, only once. - val User = Key.local[Option[User]]("user", None) - val Client = Key.local[Option[User]]("client", null) - val Span = Key.broadcast[Span]("span", EmptySpan) - val storage = Kamon.contextStorage // or something similar. - - storage.get(Span) // returns a Span instance or EmptySpan. - storage.get(User) // Returns Option[User] or None if not set. - storage.get(Client) // Returns Option[Client] or null if not set. - - // Context Propagation works the very same way as before. - - val scope = storage.store(context) - // do something here - scope.close() - - // Configuration for codecs would be handled sort of like this: - - // kamon.context.propagation { - // http-header-codecs { - // "span" = kamon.trace.propagation.B3 - // } - // - // binary-codecs { - // "span" = kamon.trace.propagation.Binary - // } - // } - - - - - -}*/ - - - -/* - - - - - -class Context(private val keys: Map[Key[_], Any]) { - - - - - -} - -object Context { - - -} - -sealed trait Key[T] { - def name: String -} - -object Key { - - def local[T](name: String): Key[T] = Local(name) - - case class Local[T](name: String) extends Key[T] -}*/ +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala new file mode 100644 index 00000000..52c97e84 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Mixin.scala @@ -0,0 +1,43 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.context + + +/** + * Utility trait that marks objects carrying a reference to a Span. + * + */ +trait HasContext { + def context: Context +} + +object HasContext { + private case class Default(context: Context) extends HasContext + + /** + * Construct a HasSpan instance that references the provided Span. + * + */ + def from(context: Context): HasContext = + Default(context) + + /** + * Construct a HasSpan instance that references the currently ActiveSpan in Kamon's tracer. + * + */ +// def fromActiveSpan(): HasContext = +// Default(Kamon.activeSpan()) +} diff --git a/kamon-core/src/main/scala/kamon/context/Storage.scala b/kamon-core/src/main/scala/kamon/context/Storage.scala new file mode 100644 index 00000000..6b92ff85 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Storage.scala @@ -0,0 +1,39 @@ +package kamon.context + +trait Storage { + def current(): Context + def store(context: Context): Storage.Scope +} + +object Storage { + + trait Scope { + def context: Context + def close(): Unit + } + + + class ThreadLocal extends Storage { + private val tls = new java.lang.ThreadLocal[Context]() { + override def initialValue(): Context = Context.Empty + } + + override def current(): Context = + tls.get() + + override def store(context: Context): Scope = { + val newContext = context + val previousContext = tls.get() + tls.set(newContext) + + new Scope { + override def context: Context = newContext + override def close(): Unit = tls.set(previousContext) + } + } + } + + object ThreadLocal { + def apply(): ThreadLocal = new ThreadLocal() + } +}
\ No newline at end of file |