aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon')
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala84
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala20
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codec.scala130
-rw-r--r--kamon-core/src/main/scala/kamon/context/Context.scala50
-rw-r--r--kamon-core/src/main/scala/kamon/context/Mixin.scala (renamed from kamon-core/src/main/scala/kamon/util/Mixin.scala)34
-rw-r--r--kamon-core/src/main/scala/kamon/context/Storage.scala39
-rw-r--r--kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala106
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala35
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala290
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala99
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContext.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala105
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala196
-rw-r--r--kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala72
14 files changed, 805 insertions, 512 deletions
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index ecbc796e..b1490e32 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -16,24 +16,23 @@
package kamon
import com.typesafe.config.{Config, ConfigFactory}
-import io.opentracing.propagation.Format
-import io.opentracing.{ActiveSpan, Span, SpanContext}
import kamon.metric._
-import kamon.trace.Tracer
+import kamon.trace._
import kamon.util.{Filters, MeasurementUnit, Registration}
import scala.concurrent.Future
import java.time.Duration
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
-import io.opentracing.ActiveSpan.Continuation
+import kamon.context.{Context, Storage}
import org.slf4j.LoggerFactory
import scala.util.Try
-object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Tracer {
+object Kamon extends MetricLookup with ReporterRegistry with Tracer {
private val logger = LoggerFactory.getLogger("kamon.Kamon")
+
@volatile private var _config = ConfigFactory.load()
@volatile private var _environment = Environment.fromConfig(_config)
@volatile private var _filters = Filters.fromConfig(_config)
@@ -41,7 +40,8 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler"))
private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporters = new ReporterRegistryImpl(_metrics, _config)
- private val _tracer = new Tracer(Kamon, _reporters, _config)
+ private val _tracer = Tracer.Default(Kamon, _reporters, _config)
+ private val _contextStorage = Storage.ThreadLocal()
private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
def environment: Environment =
@@ -56,6 +56,7 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
_filters = Filters.fromConfig(config)
_metrics.reconfigure(config)
_reporters.reconfigure(config)
+ _tracer.reconfigure(config)
_onReconfigureHooks.foreach(hook => {
Try(hook.onReconfigure(config)).failed.foreach(error =>
@@ -90,73 +91,28 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
def tracer: Tracer =
_tracer
- override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder =
+ override def buildSpan(operationName: String): Tracer.SpanBuilder =
_tracer.buildSpan(operationName)
- override def extract[C](format: Format[C], carrier: C): SpanContext =
- _tracer.extract(format, carrier)
-
- override def inject[C](spanContext: SpanContext, format: Format[C], carrier: C): Unit =
- _tracer.inject(spanContext, format, carrier)
-
- override def activeSpan(): ActiveSpan =
- _tracer.activeSpan()
- override def makeActive(span: Span): ActiveSpan =
- _tracer.makeActive(span)
+ override def identityProvider: IdentityProvider =
+ _tracer.identityProvider
+ def currentContext(): Context =
+ _contextStorage.current()
- /**
- * Makes the provided Span active before code is evaluated and deactivates it afterwards.
- */
- def withSpan[T](span: Span)(code: => T): T = {
- val activeSpan = makeActive(span)
- val evaluatedCode = code
- activeSpan.deactivate()
- evaluatedCode
- }
+ def storeContext(context: Context): Storage.Scope =
+ _contextStorage.store(context)
- /**
- * Actives the provided Continuation before code is evaluated and deactivates it afterwards.
- */
- def withContinuation[T](continuation: Continuation)(code: => T): T = {
- if(continuation == null)
- code
- else {
- val activeSpan = continuation.activate()
- val evaluatedCode = code
- activeSpan.deactivate()
- evaluatedCode
+ def withContext[T](context: Context)(f: => T): T = {
+ val scope = _contextStorage.store(context)
+ try {
+ f
+ } finally {
+ scope.close()
}
}
- /**
- * Captures a continuation from the currently active Span (if any).
- */
- def activeSpanContinuation(): Continuation = {
- val activeSpan = Kamon.activeSpan()
- if(activeSpan == null)
- null
- else
- activeSpan.capture()
- }
-
- /**
- * Runs the provided closure with the currently active Span (if any).
- */
- def onActiveSpan[T](code: ActiveSpan => T): Unit = {
- val activeSpan = Kamon.activeSpan()
- if(activeSpan != null)
- code(activeSpan)
- }
-
- /**
- * Evaluates the provided closure with the currently active Span (if any) and returns the evaluation result. If there
- * was no active Span then the provided fallback value
- */
- def fromActiveSpan[T](code: ActiveSpan => T): Option[T] =
- Option(activeSpan()).map(code)
-
override def loadReportersFromConfig(): Unit =
_reporters.loadReportersFromConfig()
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 5f46edf6..f0d744e5 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -20,9 +20,11 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._
import com.typesafe.config.Config
+import kamon.ReporterRegistry.SpanSink
import kamon.metric._
import kamon.trace.Span
-import kamon.util.{DynamicAccess, Registration}
+import kamon.trace.Span.FinishedSpan
+import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration}
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
@@ -42,6 +44,12 @@ trait ReporterRegistry {
def stopAllReporters(): Future[Unit]
}
+object ReporterRegistry {
+ private[kamon] trait SpanSink {
+ def reportSpan(finishedSpan: FinishedSpan): Unit
+ }
+}
+
sealed trait Reporter {
def start(): Unit
def stop(): Unit
@@ -53,10 +61,10 @@ trait MetricReporter extends Reporter {
}
trait SpanReporter extends Reporter {
- def reportSpans(spans: Seq[Span.CompletedSpan]): Unit
+ def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
}
-class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
+class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink {
private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
private val reporterCounter = new AtomicLong(0L)
@@ -212,7 +220,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
}
}
- private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = {
+ def reportSpan(span: Span.FinishedSpan): Unit = {
spanReporters.foreach { case (_, reporterEntry) =>
if(reporterEntry.isActive)
reporterEntry.buffer.offer(span)
@@ -251,7 +259,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
val bufferCapacity: Int,
val executionContext: ExecutionContextExecutorService
) {
- val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity)
+ val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
}
private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable {
@@ -290,7 +298,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
spanReporters.foreach {
case (_, entry) =>
- val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity)
+ val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity)
entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
Future {
diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala
new file mode 100644
index 00000000..50b7e93d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Codec.scala
@@ -0,0 +1,130 @@
+package kamon
+package context
+
+import com.typesafe.config.Config
+import kamon.trace.IdentityProvider
+import kamon.util.DynamicAccess
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable
+
+class Codec(identityProvider: IdentityProvider, initialConfig: Config) {
+ private val log = LoggerFactory.getLogger(classOf[Codec])
+
+ @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
+ //val Binary: Codec.ForContext[ByteBuffer] = _
+ reconfigure(initialConfig)
+
+
+ def HttpHeaders: Codec.ForContext[TextMap] =
+ httpHeaders
+
+ def reconfigure(config: Config): Unit = {
+ httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
+ }
+
+ private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
+ val rootConfig = config.getConfig(rootKey)
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val entries = Map.newBuilder[String, Codec.ForEntry[T]]
+
+ rootConfig.topLevelKeys.foreach(key => {
+ try {
+ val fqcn = rootConfig.getString(key)
+ entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
+ } catch {
+ case e: Throwable =>
+ log.error(s"Failed to initialize codec for key [$key]", e)
+ }
+ })
+
+ entries.result()
+ }
+}
+
+object Codec {
+
+ trait ForContext[T] {
+ def encode(context: Context): T
+ def decode(carrier: T): Context
+ }
+
+ trait ForEntry[T] {
+ def encode(context: Context): T
+ def decode(carrier: T, context: Context): Context
+ }
+
+ final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
+ private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
+
+ override def encode(context: Context): TextMap = {
+ val encoded = TextMap.Default()
+
+ context.entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(codec) =>
+ try {
+ codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+ } catch {
+ case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+ }
+
+ case None =>
+ log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+ }
+ }
+
+ encoded
+ }
+
+ override def decode(carrier: TextMap): Context = {
+ var context: Context = Context.Empty
+
+ try {
+ context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
+ val (_, codec) = codecEntry
+ codec.decode(carrier, ctx)
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from HttpHeaders", e)
+ }
+
+ context
+ }
+ }
+}
+
+
+trait TextMap {
+
+ def get(key: String): Option[String]
+
+ def put(key: String, value: String): Unit
+
+ def values: Iterator[(String, String)]
+}
+
+object TextMap {
+
+ class Default extends TextMap {
+ private val storage =
+ mutable.Map.empty[String, String]
+
+ override def get(key: String): Option[String] =
+ storage.get(key)
+
+ override def put(key: String, value: String): Unit =
+ storage.put(key, value)
+
+ override def values: Iterator[(String, String)] =
+ storage.toIterator
+ }
+
+ object Default {
+ def apply(): Default =
+ new Default()
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala
new file mode 100644
index 00000000..f8a4662f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Context.scala
@@ -0,0 +1,50 @@
+package kamon.context
+
+class Context private (private[context] val entries: Map[Key[_], Any]) {
+ def get[T](key: Key[T]): T =
+ entries.get(key).getOrElse(key.emptyValue).asInstanceOf[T]
+
+ def withKey[T](key: Key[T], value: T): Context =
+ new Context(entries.updated(key, value))
+}
+
+object Context {
+ val Empty = new Context(Map.empty)
+
+ def apply(): Context =
+ Empty
+
+ def create(): Context =
+ Empty
+
+ def apply[T](key: Key[T], value: T): Context =
+ new Context(Map(key -> value))
+
+ def create[T](key: Key[T], value: T): Context =
+ apply(key, value)
+}
+
+
+trait Key[T] {
+ def name: String
+ def emptyValue: T
+ def broadcast: Boolean
+}
+
+object Key {
+
+ def local[T](name: String, emptyValue: T): Key[T] =
+ new Default[T](name, emptyValue, false)
+
+ def broadcast[T](name: String, emptyValue: T): Key[T] =
+ new Default[T](name, emptyValue, true)
+
+
+ private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] {
+ override def hashCode(): Int =
+ name.hashCode
+
+ override def equals(that: Any): Boolean =
+ that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/util/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala
index 348b34f1..64e03748 100644
--- a/kamon-core/src/main/scala/kamon/util/Mixin.scala
+++ b/kamon-core/src/main/scala/kamon/context/Mixin.scala
@@ -13,33 +13,33 @@
* =========================================================================================
*/
-package kamon
-package util
+package kamon.context
+
+import kamon.Kamon
-import io.opentracing.ActiveSpan
-import io.opentracing.ActiveSpan.Continuation
/**
- * Utility trait that marks objects carrying an ActiveSpan.Continuation.
+ * Utility trait that marks objects carrying a reference to a Span.
+ *
*/
-trait HasContinuation {
- def continuation: Continuation
+trait HasContext {
+ def context: Context
}
-object HasContinuation {
- private class Default(val continuation: Continuation) extends HasContinuation
+object HasContext {
+ private case class Default(context: Context) extends HasContext
/**
- * Construct a HasContinuation instance by capturing a continuation from the provided active span.
+ * Construct a HasSpan instance that references the provided Span.
+ *
*/
- def from(activeSpan: ActiveSpan): HasContinuation = {
- val continuation = if(activeSpan == null) null else activeSpan.capture()
- new Default(continuation)
- }
+ def from(context: Context): HasContext =
+ Default(context)
/**
- * Constructs a new HasContinuation instance using Kamon's tracer currently active span.
+ * Construct a HasSpan instance that references the currently ActiveSpan in Kamon's tracer.
+ *
*/
- def fromTracerActiveSpan(): HasContinuation =
- new Default(Kamon.activeSpanContinuation())
+ def fromCurrentContext(): HasContext =
+ Default(Kamon.currentContext())
}
diff --git a/kamon-core/src/main/scala/kamon/context/Storage.scala b/kamon-core/src/main/scala/kamon/context/Storage.scala
new file mode 100644
index 00000000..6b92ff85
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Storage.scala
@@ -0,0 +1,39 @@
+package kamon.context
+
+trait Storage {
+ def current(): Context
+ def store(context: Context): Storage.Scope
+}
+
+object Storage {
+
+ trait Scope {
+ def context: Context
+ def close(): Unit
+ }
+
+
+ class ThreadLocal extends Storage {
+ private val tls = new java.lang.ThreadLocal[Context]() {
+ override def initialValue(): Context = Context.Empty
+ }
+
+ override def current(): Context =
+ tls.get()
+
+ override def store(context: Context): Scope = {
+ val newContext = context
+ val previousContext = tls.get()
+ tls.set(newContext)
+
+ new Scope {
+ override def context: Context = newContext
+ override def close(): Unit = tls.set(previousContext)
+ }
+ }
+ }
+
+ object ThreadLocal {
+ def apply(): ThreadLocal = new ThreadLocal()
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala
new file mode 100644
index 00000000..937200f5
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala
@@ -0,0 +1,106 @@
+package kamon.trace
+
+import java.nio.ByteBuffer
+import java.util.concurrent.ThreadLocalRandom
+
+import kamon.util.HexCodec
+
+import scala.util.Try
+
+trait IdentityProvider {
+ def traceIdGenerator(): IdentityProvider.Generator
+ def spanIdGenerator(): IdentityProvider.Generator
+}
+
+object IdentityProvider {
+ case class Identifier(string: String, bytes: Array[Byte]) {
+
+ override def equals(obj: Any): Boolean = {
+ if(obj != null && obj.isInstanceOf[Identifier])
+ obj.asInstanceOf[Identifier].string == string
+ else false
+ }
+ }
+
+ val NoIdentifier = Identifier("", new Array[Byte](0))
+
+ trait Generator {
+ def generate(): Identifier
+ def from(string: String): Identifier
+ def from(bytes: Array[Byte]): Identifier
+ }
+
+
+ class Default extends IdentityProvider {
+ protected val longGenerator = new Generator {
+ override def generate(): Identifier = {
+ val data = ByteBuffer.wrap(new Array[Byte](8))
+ val random = ThreadLocalRandom.current().nextLong()
+ data.putLong(random)
+
+ Identifier(HexCodec.toLowerHex(random), data.array())
+ }
+
+ override def from(string: String): Identifier = Try {
+ val identifierLong = HexCodec.lowerHexToUnsignedLong(string)
+ val data = ByteBuffer.allocate(8)
+ data.putLong(identifierLong)
+
+ Identifier(string, data.array())
+ } getOrElse(IdentityProvider.NoIdentifier)
+
+ override def from(bytes: Array[Byte]): Identifier = Try {
+ val buffer = ByteBuffer.wrap(bytes)
+ val identifierLong = buffer.getLong
+
+ Identifier(HexCodec.toLowerHex(identifierLong), bytes)
+ } getOrElse(IdentityProvider.NoIdentifier)
+ }
+
+ override def traceIdGenerator(): Generator = longGenerator
+ override def spanIdGenerator(): Generator = longGenerator
+ }
+
+ object Default {
+ def apply(): Default = new Default()
+ }
+
+
+ class DoubleSizeTraceID extends Default {
+ private val doubleLongGenerator = new Generator {
+ override def generate(): Identifier = {
+ val data = ByteBuffer.wrap(new Array[Byte](16))
+ val highLong = ThreadLocalRandom.current().nextLong()
+ val lowLong = ThreadLocalRandom.current().nextLong()
+ data.putLong(highLong)
+ data.putLong(lowLong)
+
+ Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), data.array())
+ }
+
+ override def from(string: String): Identifier = Try {
+ val highPart = HexCodec.lowerHexToUnsignedLong(string.substring(0, 16))
+ val lowPart = HexCodec.lowerHexToUnsignedLong(string.substring(16, 32))
+ val data = ByteBuffer.allocate(16)
+ data.putLong(highPart)
+ data.putLong(lowPart)
+
+ Identifier(string, data.array())
+ } getOrElse(IdentityProvider.NoIdentifier)
+
+ override def from(bytes: Array[Byte]): Identifier = Try {
+ val buffer = ByteBuffer.wrap(bytes)
+ val highLong = buffer.getLong
+ val lowLong = buffer.getLong
+
+ Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), bytes)
+ } getOrElse(IdentityProvider.NoIdentifier)
+ }
+
+ override def traceIdGenerator(): Generator = doubleLongGenerator
+ }
+
+ object DoubleSizeTraceID {
+ def apply(): DoubleSizeTraceID = new DoubleSizeTraceID()
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
index 0347a151..3f366175 100644
--- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -15,39 +15,44 @@
package kamon.trace
+import java.util.concurrent.ThreadLocalRandom
+import kamon.trace.SpanContext.SamplingDecision
+
trait Sampler {
- def decide(spanID: Long): Boolean
+ def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision
}
object Sampler {
- val always = new Constant(true)
- val never = new Constant(false)
+ val Always = new Constant(SamplingDecision.Sample)
+ val Never = new Constant(SamplingDecision.DoNotSample)
- def random(chance: Double): Sampler = {
- assert(chance >= 0D && chance <= 1.0D, "Change should be >= 0 and <= 1.0")
+ def random(probability: Double): Sampler = {
+ assert(probability >= 0D && probability <= 1.0D, "The probability should be >= 0 and <= 1.0")
- chance match {
- case 0D => never
- case 1.0D => always
+ probability match {
+ case 0D => Never
+ case 1.0D => Always
case anyOther => new Random(anyOther)
}
}
- class Constant(decision: Boolean) extends Sampler {
- override def decide(spanID: Long): Boolean = decision
+ class Constant(decision: SamplingDecision) extends Sampler {
+ override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = decision
override def toString: String =
s"Sampler.Constant(decision = $decision)"
}
- class Random(chance: Double) extends Sampler {
- val upperBoundary = Long.MaxValue * chance
+ class Random(probability: Double) extends Sampler {
+ val upperBoundary = Long.MaxValue * probability
val lowerBoundary = -upperBoundary
- override def decide(spanID: Long): Boolean =
- spanID >= lowerBoundary && spanID <= upperBoundary
+ override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = {
+ val random = ThreadLocalRandom.current().nextLong()
+ if(random >= lowerBoundary && random <= upperBoundary) SamplingDecision.Sample else SamplingDecision.DoNotSample
+ }
override def toString: String =
- s"Sampler.Random(chance = $chance)"
+ s"Sampler.Random(probability = $probability)"
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 464559e3..a4424a45 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -16,176 +16,220 @@
package kamon
package trace
-
-import scala.collection.JavaConverters._
+import kamon.ReporterRegistry.SpanSink
+import kamon.context.Key
+import kamon.trace.SpanContext.SamplingDecision
import kamon.util.{Clock, MeasurementUnit}
-class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long,
- reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
- private var isOpen: Boolean = true
- private val sampled: Boolean = spanContext.sampled
- private var operationName: String = initialOperationName
- private var endTimestampMicros: Long = 0
+trait Span {
- private var tags = initialTags
- private var logs = List.empty[Span.LogEntry]
- private var additionalMetricTags = Map.empty[String, String]
+ def isEmpty(): Boolean
+ def isLocal(): Boolean
+ def nonEmpty(): Boolean = !isEmpty()
+ def isRemote(): Boolean = !isLocal()
- override def log(fields: java.util.Map[String, _]): Span =
- log(fields.asScala.asInstanceOf[Map[String, _]])
+ def context(): SpanContext
- def log(fields: Map[String, _]): Span = synchronized {
- if (sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs
- this
- }
+ def annotate(annotation: Span.Annotation): Span
- def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, fields) :: logs
- this
- }
+ def addSpanTag(key: String, value: String): Span
- override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span =
- log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]])
+ def addSpanTag(key: String, value: Long): Span
- override def log(event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs
- this
- }
+ def addSpanTag(key: String, value: Boolean): Span
- override def log(timestampMicroseconds: Long, event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs
- this
- }
+ def addMetricTag(key: String, value: String): Span
- override def log(eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs
- this
- }
+ def setOperationName(name: String): Span
+
+ def disableMetricsCollection(): Span
+
+ def finish(finishTimestampMicros: Long): Unit
+
+ def finish(): Unit =
+ finish(Clock.microTimestamp())
+
+ def annotate(name: String): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty))
- override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs
- this
+ def annotate(name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, fields))
+
+ def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(timestampMicroseconds, name, fields))
+
+}
+
+object Span {
+
+ val ContextKey = Key.broadcast[Span]("span", Span.Empty)
+
+ object Empty extends Span {
+ override val context: SpanContext = SpanContext.EmptySpanContext
+ override def isEmpty(): Boolean = true
+ override def isLocal(): Boolean = true
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addSpanTag(key: String, value: Long): Span = this
+ override def addSpanTag(key: String, value: Boolean): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(finishTimestampMicros: Long): Unit = {}
}
- override def getBaggageItem(key: String): String =
- spanContext.getBaggage(key)
+ /**
+ *
+ * @param spanContext
+ * @param initialOperationName
+ * @param initialSpanTags
+ * @param startTimestampMicros
+ * @param spanSink
+ */
+ final class Local(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink) extends Span {
+
+ private var collectMetrics: Boolean = true
+ private var open: Boolean = true
+ private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample
+ private var operationName: String = initialOperationName
+
+ private var spanTags: Map[String, Span.TagValue] = initialSpanTags
+ private var customMetricTags = initialMetricTags
+ private var annotations = List.empty[Span.Annotation]
+
+ override def isEmpty(): Boolean = false
+ override def isLocal(): Boolean = true
+
+ def annotate(annotation: Annotation): Span = synchronized {
+ if(sampled && open)
+ annotations = annotation :: annotations
+ this
+ }
- override def context(): SpanContext =
- spanContext
+ override def addSpanTag(key: String, value: String): Span = synchronized {
+ if(sampled && open)
+ spanTags = spanTags + (key -> TagValue.String(value))
+ this
+ }
- override def setTag(key: String, value: String): Span = synchronized {
- if (isOpen) {
- extractMetricTag(key, value)
- if(sampled)
- tags = tags ++ Map(key -> value)
+ override def addSpanTag(key: String, value: Long): Span = synchronized {
+ if(sampled && open)
+ spanTags = spanTags + (key -> TagValue.Number(value))
+ this
}
- this
- }
- override def setTag(key: String, value: Boolean): Span = {
- if (isOpen) {
- val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addSpanTag(key: String, value: Boolean): Span = synchronized {
+ if(sampled && open) {
+ val tagValue = if (value) TagValue.True else TagValue.False
+ spanTags = spanTags + (key -> tagValue)
+ }
+ this
}
- this
- }
- override def setTag(key: String, value: Number): Span = {
- if (isOpen) {
- val tagValue = String.valueOf(value)
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addMetricTag(key: String, value: String): Span = synchronized {
+ if(sampled && open && collectMetrics)
+ customMetricTags = customMetricTags + (key -> value)
+ this
}
- this
- }
- def setMetricTag(key: String, value: String): Span = synchronized {
- if (isOpen)
- additionalMetricTags = additionalMetricTags ++ Map(key -> value)
- this
- }
+ override def disableMetricsCollection(): Span = synchronized {
+ collectMetrics = false
+ this
+ }
- override def setBaggageItem(key: String, value: String): Span = synchronized {
- if (isOpen)
- spanContext.addBaggageItem(key, value)
- this
- }
+ override def context(): SpanContext =
+ spanContext
- override def setOperationName(operationName: String): Span = synchronized {
- if(isOpen)
- this.operationName = operationName
- this
- }
+ override def setOperationName(operationName: String): Span = synchronized {
+ if(open)
+ this.operationName = operationName
+ this
+ }
- private def extractMetricTag(tag: String, value: String): Unit =
- if(tag.startsWith(Span.MetricTagPrefix))
- additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
+ override def finish(finishMicros: Long): Unit = synchronized {
+ if (open) {
+ open = false
- override def finish(): Unit =
- finish(Clock.microTimestamp())
+ if(collectMetrics)
+ recordSpanMetrics(finishMicros)
+
+ if(sampled)
+ spanSink.reportSpan(toFinishedSpan(finishMicros))
+ }
+ }
+
+ private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan =
+ Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations)
+
+ private def recordSpanMetrics(endTimestampMicros: Long): Unit = {
+ val elapsedTime = endTimestampMicros - startTimestampMicros
+ val metricTags = Map("operation" -> operationName) ++ customMetricTags
- override def finish(finishMicros: Long): Unit = synchronized {
- if (isOpen) {
- isOpen = false
- endTimestampMicros = finishMicros
- recordSpanMetrics()
+ val isError = spanTags.get("error").exists {
+ errorTag => errorTag != null && errorTag.equals(Span.TagValue.True)
+ }
- if(sampled)
- reporterRegistry.reportSpan(completedSpan)
+ val refinedMetricTags = if(isError)
+ metricTags + ("error" -> "true")
+ else
+ metricTags
+
+ val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedMetricTags)
+ latencyHistogram.record(elapsedTime)
}
}
- private def completedSpan: Span.CompletedSpan =
- Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs)
+ object Local {
+ def apply(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl): Local =
+ new Local(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry)
+ }
- private def recordSpanMetrics(): Unit = {
- val elapsedTime = endTimestampMicros - startTimestampMicros
- val metricTags = Map("operation" -> operationName) ++ additionalMetricTags
- val isError = tags.get("error").exists {
- errorTag => errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)
- }
+ final class Remote(val context: SpanContext) extends Span {
+ override def isEmpty(): Boolean = false
+ override def isLocal(): Boolean = false
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addSpanTag(key: String, value: Long): Span = this
+ override def addSpanTag(key: String, value: Boolean): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(finishTimestampMicros: Long): Unit = {}
+ }
- val refinedTags = if(isError) {
- metricTags + ("error" -> Span.BooleanTagTrueValue)
- } else {
- metricTags
- }
+ object Remote {
+ def apply(spanContext: SpanContext): Remote =
+ new Remote(spanContext)
+ }
+
+ sealed trait TagValue
+ object TagValue {
+ sealed trait Boolean extends TagValue
+ case object True extends Boolean
+ case object False extends Boolean
- val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedTags)
- latencyHistogram.record(elapsedTime)
+ case class String(string: java.lang.String) extends TagValue
+ case class Number(number: Long) extends TagValue
}
-}
-object Span {
object Metrics {
val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds)
val SpanErrorCount = Kamon.counter("span.error-count")
}
- val MetricTagPrefix = "metric."
- val BooleanTagTrueValue = "1"
- val BooleanTagFalseValue = "0"
-
- case class LogEntry(timestamp: Long, fields: Map[String, _])
+ case class Annotation(timestampMicros: Long, name: String, fields: Map[String, String])
- case class CompletedSpan(
+ case class FinishedSpan(
context: SpanContext,
operationName: String,
startTimestampMicros: Long,
endTimestampMicros: Long,
- tags: Map[String, String],
- logs: Seq[LogEntry]
+ tags: Map[String, Span.TagValue],
+ annotations: Seq[Annotation]
)
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
new file mode 100644
index 00000000..e04ceb03
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -0,0 +1,99 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.net.{URLDecoder, URLEncoder}
+
+import kamon.Kamon
+import kamon.context.{Codec, Context, TextMap}
+import kamon.trace.SpanContext.SamplingDecision
+
+
+object SpanCodec {
+
+ class B3 extends Codec.ForEntry[TextMap] {
+ import B3.Headers
+
+ override def encode(context: Context): TextMap = {
+ val span = context.get(Span.ContextKey)
+ val carrier = TextMap.Default()
+
+ if(span.nonEmpty()) {
+ val spanContext = span.context
+ carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+ carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+
+ encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
+ carrier.put(Headers.Sampled, samplingDecision)
+ }
+ }
+
+ carrier
+ }
+
+ override def decode(carrier: TextMap, context: Context): Context = {
+ val identityProvider = Kamon.tracer.identityProvider
+ val traceID = carrier.get(Headers.TraceIdentifier)
+ .map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ val spanID = carrier.get(Headers.SpanIdentifier)
+ .map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
+ val parentID = carrier.get(Headers.ParentSpanIdentifier)
+ .map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ val flags = carrier.get(Headers.Flags)
+
+ val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match {
+ case Some(sampled) if sampled == "1" => SamplingDecision.Sample
+ case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
+ case _ => SamplingDecision.Unknown
+ }
+
+ context.withKey(Span.ContextKey, Span.Remote(SpanContext(traceID, spanID, parentID, samplingDecision)))
+
+ } else context
+ }
+
+ private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match {
+ case SamplingDecision.Sample => Some("1")
+ case SamplingDecision.DoNotSample => Some("0")
+ case SamplingDecision.Unknown => None
+ }
+
+ private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8")
+ private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8")
+ }
+
+ object B3 {
+
+ def apply(): B3 =
+ new B3()
+
+ object Headers {
+ val TraceIdentifier = "X-B3-TraceId"
+ val ParentSpanIdentifier = "X-B3-ParentSpanId"
+ val SpanIdentifier = "X-B3-SpanId"
+ val Sampled = "X-B3-Sampled"
+ val Flags = "X-B3-Flags"
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
index b37e208b..4d013881 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
@@ -15,26 +15,51 @@
package kamon.trace
-import java.lang
-import java.util.{Map => JavaMap}
+import kamon.trace.IdentityProvider.Identifier
+import kamon.trace.SpanContext.SamplingDecision
-import scala.collection.JavaConverters._
+/**
+ *
+ * @param traceID
+ * @param spanID
+ * @param parentID
+ * @param samplingDecision
+ */
+case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision) {
-class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long, val sampled: Boolean,
- private var baggage: Map[String, String]) extends io.opentracing.SpanContext {
+ def createChild(childSpanID: Identifier, samplingDecision: SamplingDecision): SpanContext =
+ this.copy(parentID = this.spanID, spanID = childSpanID)
+}
- private[kamon] def addBaggageItem(key: String, value: String): Unit = synchronized {
- baggage = baggage + (key -> value)
- }
+object SpanContext {
- private[kamon] def getBaggage(key: String): String = synchronized {
- baggage.get(key).getOrElse(null)
- }
+ val EmptySpanContext = SpanContext(
+ traceID = IdentityProvider.NoIdentifier,
+ spanID = IdentityProvider.NoIdentifier,
+ parentID = IdentityProvider.NoIdentifier,
+ samplingDecision = SamplingDecision.DoNotSample
+ )
+
+
+ sealed trait SamplingDecision
- private[kamon] def baggageMap: Map[String, String] =
- baggage
+ object SamplingDecision {
+
+ /**
+ * The Trace is sampled, all child Spans should be sampled as well.
+ */
+ case object Sample extends SamplingDecision
+
+ /**
+ * The Trace is not sampled, none of the child Spans should be sampled.
+ */
+ case object DoNotSample extends SamplingDecision
+
+ /**
+ * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span.
+ */
+ case object Unknown extends SamplingDecision
- override def baggageItems(): lang.Iterable[JavaMap.Entry[String, String]] = synchronized {
- baggage.asJava.entrySet()
}
-}
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
deleted file mode 100644
index 8e3a446b..00000000
--- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-
-package kamon.trace
-
-import java.net.{URLDecoder, URLEncoder}
-import java.util.concurrent.ThreadLocalRandom
-
-import scala.collection.JavaConverters._
-import io.opentracing.propagation.TextMap
-import kamon.util.HexCodec
-
-
-trait SpanContextCodec[T] {
- def inject(spanContext: SpanContext, carrier: T): Unit
- def extract(carrier: T, sampler: Sampler): SpanContext
-}
-
-object SpanContextCodec {
-
- val TextMap: SpanContextCodec[TextMap] = new TextMapSpanCodec(
- traceIDKey = "TRACE_ID",
- parentIDKey = "PARENT_ID",
- spanIDKey = "SPAN_ID",
- sampledKey = "SAMPLED",
- baggagePrefix = "BAGGAGE_",
- baggageValueEncoder = identity,
- baggageValueDecoder = identity
- )
-
- val ZipkinB3: SpanContextCodec[TextMap] = new TextMapSpanCodec(
- traceIDKey = "X-B3-TraceId",
- parentIDKey = "X-B3-ParentSpanId",
- spanIDKey = "X-B3-SpanId",
- sampledKey = "X-B3-Sampled",
- baggagePrefix = "X-B3-Baggage-",
- baggageValueEncoder = urlEncode,
- baggageValueDecoder = urlDecode
- )
-
- private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8")
- private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8")
-
- private class TextMapSpanCodec(traceIDKey: String, parentIDKey: String, spanIDKey: String, sampledKey: String, baggagePrefix: String,
- baggageValueEncoder: String => String, baggageValueDecoder: String => String) extends SpanContextCodec[TextMap] {
-
- override def inject(spanContext: SpanContext, carrier: TextMap): Unit = {
- carrier.put(traceIDKey, encodeLong(spanContext.traceID))
- carrier.put(parentIDKey, encodeLong(spanContext.parentID))
- carrier.put(spanIDKey, encodeLong(spanContext.spanID))
-
- spanContext.baggageItems().iterator().asScala.foreach { entry =>
- carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue))
- }
- }
-
- override def extract(carrier: TextMap, sampler: Sampler): SpanContext = {
- var traceID: String = null
- var parentID: String = null
- var spanID: String = null
- var sampled: String = null
- var baggage: Map[String, String] = Map.empty
-
- carrier.iterator().asScala.foreach { entry =>
- if(entry.getKey.equals(traceIDKey))
- traceID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(parentIDKey))
- parentID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(spanIDKey))
- spanID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(sampledKey))
- sampled = entry.getValue
- else if(entry.getKey.startsWith(baggagePrefix))
- baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue))
- }
-
- if(traceID != null && spanID != null) {
- val actualParent = if(parentID == null) 0L else decodeLong(parentID)
- val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1")
-
- new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage)
- } else null
- }
-
- private def decodeLong(input: String): Long =
- HexCodec.lowerHexToUnsignedLong(input)
-
- private def encodeLong(input: Long): String =
- HexCodec.toLowerHex(input)
-
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 19067f5e..7d8830ca 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -13,148 +13,156 @@
* =========================================================================================
*/
-
package kamon.trace
-import java.util.concurrent.ThreadLocalRandom
-
import com.typesafe.config.Config
-import io.opentracing.propagation.{Format, TextMap}
-import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP}
-import io.opentracing.util.ThreadLocalActiveSpanSource
-import kamon.ReporterRegistryImpl
+import kamon.{Kamon, ReporterRegistryImpl}
import kamon.metric.MetricLookup
-import kamon.util.Clock
+import kamon.trace.Span.TagValue
+import kamon.trace.SpanContext.SamplingDecision
+import kamon.trace.Tracer.SpanBuilder
+import kamon.util.{Clock, DynamicAccess}
import org.slf4j.LoggerFactory
+import scala.collection.immutable
+import scala.util.Try
-class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config)
- extends ThreadLocalActiveSpanSource with io.opentracing.Tracer {
+trait Tracer {
+ def buildSpan(operationName: String): SpanBuilder
+ def identityProvider: IdentityProvider
+}
- private val logger = LoggerFactory.getLogger(classOf[Tracer])
- private val tracerMetrics = new TracerMetrics(metrics)
+object Tracer {
- @volatile private var configuredSampler: Sampler = Sampler.never
- @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap
- @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3
+ final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer {
+ private val logger = LoggerFactory.getLogger(classOf[Tracer])
- reconfigure(initialConfig)
+ private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
+ @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true
+ @volatile private[Tracer] var configuredSampler: Sampler = Sampler.Never
+ @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default()
- override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder =
- new SpanBuilder(operationName)
+ reconfigure(initialConfig)
- override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match {
- case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
- case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
- case BINARY => null // TODO: Implement Binary Encoding
- case _ => null
- }
+ override def buildSpan(operationName: String): SpanBuilder =
+ new SpanBuilder(operationName, this, reporterRegistry)
- override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match {
- case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap])
- case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap])
- case BINARY =>
- case _ =>
- }
+ override def identityProvider: IdentityProvider =
+ this._identityProvider
+
+ def sampler: Sampler =
+ configuredSampler
+
+ private[kamon] def reconfigure(config: Config): Unit = synchronized {
+ Try {
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val traceConfig = config.getConfig("kamon.trace")
- def sampler: Sampler =
- configuredSampler
+ val newSampler = traceConfig.getString("sampler") match {
+ case "always" => Sampler.Always
+ case "never" => Sampler.Never
+ case "random" => Sampler.random(traceConfig.getDouble("random-sampler.probability"))
+ case other => sys.error(s"Unexpected sampler name $other.")
+ }
+
+ val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id")
- def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.textMapSpanContextCodec = codec
+ val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider](
+ traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)]
+ ).get
- def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.httpHeaderSpanContextCodec = codec
+ configuredSampler = newSampler
+ joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
+ _identityProvider = newIdentityProvider
- private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder {
- private var parentContext: SpanContext = _
+ }.failed.foreach {
+ ex => logger.error("Unable to reconfigure Kamon Tracer", ex)
+ }
+ }
+ }
+
+ object Default {
+ def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default =
+ new Default(metrics, reporterRegistry, initialConfig)
+ }
+
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) {
+ private var parentSpan: Span = _
private var startTimestamp = 0L
- private var initialTags = Map.empty[String, String]
+ private var initialSpanTags = Map.empty[String, Span.TagValue]
+ private var initialMetricTags = Map.empty[String, String]
private var useActiveSpanAsParent = true
- override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match {
- case spanContext: kamon.trace.SpanContext =>
- this.parentContext = spanContext
- this
- case null => this
- case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this
+ def asChildOf(parent: Span): SpanBuilder = {
+ if(parent != Span.Empty) this.parentSpan = parent
+ this
}
- override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder =
- asChildOf(parent.context())
-
- override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = {
- if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) {
- asChildOf(referencedContext)
- } else this
+ def withMetricTag(key: String, value: String): SpanBuilder = {
+ this.initialMetricTags = this.initialMetricTags + (key -> value)
+ this
}
- override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value)
+ def withSpanTag(key: String, value: String): SpanBuilder = {
+ this.initialSpanTags = this.initialSpanTags + (key -> TagValue.String(value))
this
}
- override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value.toString)
+ def withSpanTag(key: String, value: Long): SpanBuilder = {
+ this.initialSpanTags = this.initialSpanTags + (key -> TagValue.Number(value))
this
}
- override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value.toString)
+ def withSpanTag(key: String, value: Boolean): SpanBuilder = {
+ val tagValue = if (value) TagValue.True else TagValue.False
+ this.initialSpanTags = this.initialSpanTags + (key -> tagValue)
this
}
- override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = {
+ def withStartTimestamp(microseconds: Long): SpanBuilder = {
this.startTimestamp = microseconds
this
}
- override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = {
+ def ignoreActiveSpan(): SpanBuilder = {
this.useActiveSpanAsParent = false
this
}
- override def start(): io.opentracing.Span =
- startManual()
+ def start(): Span = {
+ val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp()
- override def startActive(): io.opentracing.ActiveSpan =
- makeActive(startManual())
+ val parentSpan: Option[Span] = Option(this.parentSpan)
+ .orElse(if(useActiveSpanAsParent) Some(Kamon.currentContext().get(Span.ContextKey)) else None)
+ .filter(span => span != Span.Empty)
- override def startManual(): Span = {
- val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp()
+ val samplingDecision: SamplingDecision = parentSpan
+ .map(_.context.samplingDecision)
+ .filter(_ != SamplingDecision.Unknown)
+ .getOrElse(tracer.sampler.decide(operationName, initialSpanTags))
- if(parentContext == null && useActiveSpanAsParent) {
- val possibleParent = activeSpan()
- if(possibleParent != null)
- parentContext = possibleParent.context().asInstanceOf[SpanContext]
+ val spanContext = parentSpan match {
+ case Some(parent) => joinParentContext(parent, samplingDecision)
+ case None => newSpanContext(samplingDecision)
}
- val spanContext =
- if(parentContext != null)
- new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, parentContext.baggageMap)
- else {
- val traceID = createID()
- new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), Map.empty)
- }
-
- tracerMetrics.createdSpans.increment()
- new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry)
+ tracer.tracerMetrics.createdSpans.increment()
+ Span.Local(spanContext, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry)
}
- private def createID(): Long =
- ThreadLocalRandom.current().nextLong()
- }
-
-
- private[kamon] def reconfigure(config: Config): Unit = synchronized {
- val traceConfig = config.getConfig("kamon.trace")
-
- configuredSampler = traceConfig.getString("sampler") match {
- case "always" => Sampler.always
- case "never" => Sampler.never
- case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance"))
- case other => sys.error(s"Unexpected sampler name $other.")
- }
+ private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
+ if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID)
+ parent.context().copy(samplingDecision = samplingDecision)
+ else
+ parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)
+
+ private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
+ SpanContext(
+ traceID = tracer._identityProvider.traceIdGenerator().generate(),
+ spanID = tracer._identityProvider.spanIdGenerator().generate(),
+ parentID = IdentityProvider.NoIdentifier,
+ samplingDecision = samplingDecision
+ )
}
private final class TracerMetrics(metricLookup: MetricLookup) {
diff --git a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala
deleted file mode 100644
index 885a73d9..00000000
--- a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2015 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-package kamon.util
-
-import java.util.function.Supplier
-
-import kamon.trace.{SpanContext => KamonSpanContext}
-import kamon.Kamon
-import org.slf4j.MDC
-
-import scala.collection.JavaConverters._
-
-object BaggageOnMDC {
-
- /**
- * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys
- * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well.
- *
- */
- def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = {
- val activeSpan = Kamon.activeSpan()
- if(activeSpan == null)
- code
- else {
- val baggageItems = activeSpan.context().baggageItems().asScala
- baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue))
- if(includeTraceID)
- addTraceIDToMDC(activeSpan.context())
-
- val evaluatedCode = code
-
- baggageItems.foreach(entry => MDC.remove(entry.getKey))
- if(includeTraceID)
- removeTraceIDFromMDC()
-
- evaluatedCode
-
- }
- }
-
- def withBaggageOnMDC[T](code: Supplier[T]): T =
- withBaggageOnMDC(true, code.get())
-
- def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T =
- withBaggageOnMDC(includeTraceID, code.get())
-
- def withBaggageOnMDC[T](code: => T): T =
- withBaggageOnMDC(true, code)
-
- private val TraceIDKey = "trace_id"
-
- private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match {
- case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID))
- case _ =>
- }
-
- private def removeTraceIDFromMDC(): Unit =
- MDC.remove(TraceIDKey)
-}