aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-07-14 14:12:47 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-07-14 14:12:47 +0200
commit34010efc7b273e50d805a277646f14aa96aaa8b2 (patch)
tree8f7a6f00eac4e0a4cb60c9093b3c5d06ed982662
parent52c4503b6aea2309feeb550b7db2e5fa627dedc8 (diff)
downloadKamon-34010efc7b273e50d805a277646f14aa96aaa8b2.tar.gz
Kamon-34010efc7b273e50d805a277646f14aa96aaa8b2.tar.bz2
Kamon-34010efc7b273e50d805a277646f14aa96aaa8b2.zip
wip
-rw-r--r--build.sbt21
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala31
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala8
-rw-r--r--kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala70
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Continuation.scala39
-rw-r--r--kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala19
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala247
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContext.scala92
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala170
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala217
-rw-r--r--kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala96
-rw-r--r--kamon-core/src/main/scala/kamon/util/Mixin.scala4
13 files changed, 713 insertions, 358 deletions
diff --git a/build.sbt b/build.sbt
index 16c0ceac..23cde6bb 100644
--- a/build.sbt
+++ b/build.sbt
@@ -20,7 +20,7 @@ crossScalaVersions := Seq("2.12.2", "2.11.8", "2.10.6")
lazy val kamon = (project in file("."))
.settings(moduleName := "kamon")
.settings(noPublishing: _*)
- .aggregate(core)//, testkit)
+ .aggregate(core, opentracing)
lazy val core = (project in file("kamon-core"))
@@ -33,13 +33,24 @@ lazy val core = (project in file("kamon-core"))
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.3.1",
"org.slf4j" % "slf4j-api" % "1.7.7",
- "ch.qos.logback" % "logback-classic" % "1.2.2",
"org.hdrhistogram" % "HdrHistogram" % "2.1.9",
- "io.opentracing" % "opentracing-api" % "0.30.0",
- "io.opentracing" % "opentracing-util" % "0.30.0",
+
"com.lihaoyi" %% "fansi" % "0.2.4",
+ "org.scalatest" %% "scalatest" % "3.0.1" % "test"
+ )
+ )
- //"uk.org.lidalia" % "slf4j-test" % "1.1.0" % "test",
+lazy val opentracing = (project in file("kamon-opentracing"))
+ .settings(moduleName := "kamon-opentracing")
+ .dependsOn(core)
+ .settings(
+ isSnapshot := true,
+ scalaVersion := "2.11.8",
+ javacOptions += "-XDignore.symbol.file",
+ resolvers += Resolver.mavenLocal,
+ libraryDependencies ++= Seq(
+ "io.opentracing" % "opentracing-api" % "0.30.0",
+ "io.opentracing" % "opentracing-util" % "0.30.0",
"org.scalatest" %% "scalatest" % "3.0.1" % "test"
)
)
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index ecbc796e..5c7f9e53 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -16,23 +16,21 @@
package kamon
import com.typesafe.config.{Config, ConfigFactory}
-import io.opentracing.propagation.Format
-import io.opentracing.{ActiveSpan, Span, SpanContext}
import kamon.metric._
-import kamon.trace.Tracer
+import kamon.trace.{ActiveSpan, Span, SpanContext, Tracer, Continuation}
import kamon.util.{Filters, MeasurementUnit, Registration}
import scala.concurrent.Future
import java.time.Duration
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
-import io.opentracing.ActiveSpan.Continuation
+import kamon.trace.SpanContextCodec.Format
import org.slf4j.LoggerFactory
import scala.util.Try
-object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Tracer {
+object Kamon extends MetricLookup with ReporterRegistry {
private val logger = LoggerFactory.getLogger("kamon.Kamon")
@volatile private var _config = ConfigFactory.load()
@volatile private var _environment = Environment.fromConfig(_config)
@@ -41,7 +39,7 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler"))
private val _metrics = new MetricRegistry(_config, _scheduler)
private val _reporters = new ReporterRegistryImpl(_metrics, _config)
- private val _tracer = new Tracer(Kamon, _reporters, _config)
+ private val _tracer = new Tracer.Default(Kamon, _reporters, _config)
private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
def environment: Environment =
@@ -90,19 +88,19 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
def tracer: Tracer =
_tracer
- override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder =
+ def buildSpan(operationName: String): Tracer.SpanBuilder =
_tracer.buildSpan(operationName)
- override def extract[C](format: Format[C], carrier: C): SpanContext =
+ def extract[C](format: Format[C], carrier: C): Option[SpanContext] =
_tracer.extract(format, carrier)
- override def inject[C](spanContext: SpanContext, format: Format[C], carrier: C): Unit =
+ def inject[C](spanContext: SpanContext, format: Format[C], carrier: C): Unit =
_tracer.inject(spanContext, format, carrier)
- override def activeSpan(): ActiveSpan =
+ def activeSpan(): ActiveSpan =
_tracer.activeSpan()
- override def makeActive(span: Span): ActiveSpan =
+ def makeActive(span: Span): ActiveSpan =
_tracer.makeActive(span)
@@ -133,13 +131,8 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
/**
* Captures a continuation from the currently active Span (if any).
*/
- def activeSpanContinuation(): Continuation = {
- val activeSpan = Kamon.activeSpan()
- if(activeSpan == null)
- null
- else
- activeSpan.capture()
- }
+ def activeSpanContinuation(): Continuation =
+ activeSpan().capture()
/**
* Runs the provided closure with the currently active Span (if any).
@@ -155,7 +148,7 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac
* was no active Span then the provided fallback value
*/
def fromActiveSpan[T](code: ActiveSpan => T): Option[T] =
- Option(activeSpan()).map(code)
+ None//activeSpan().map(code)
override def loadReportersFromConfig(): Unit =
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 5f46edf6..8a36a7c7 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -53,7 +53,7 @@ trait MetricReporter extends Reporter {
}
trait SpanReporter extends Reporter {
- def reportSpans(spans: Seq[Span.CompletedSpan]): Unit
+ def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
}
class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
@@ -212,7 +212,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
}
}
- private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = {
+ private[kamon] def reportSpan(span: Span.FinishedSpan): Unit = {
spanReporters.foreach { case (_, reporterEntry) =>
if(reporterEntry.isActive)
reporterEntry.buffer.offer(span)
@@ -251,7 +251,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
val bufferCapacity: Int,
val executionContext: ExecutionContextExecutorService
) {
- val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity)
+ val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
}
private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable {
@@ -290,7 +290,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
spanReporters.foreach {
case (_, entry) =>
- val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity)
+ val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity)
entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
Future {
diff --git a/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala
new file mode 100644
index 00000000..3a46d94f
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala
@@ -0,0 +1,70 @@
+package kamon.trace
+
+/**
+ * Wraps a [[kamon.trace.Span]] that has been activated in the current Thread. By activated we really mean, it is
+ * stored in a ThreadLocal value inside the tracer until [[kamon.trace.ActiveSpan#deactivate()]] is called.
+ *
+ * When a [[kamon.trace.Span]] is activated it will keep a reference to the previously active Span on the current
+ * Thread, take it's place as the currently active Span and put the original one once this ActiveSpan gets deactivated.
+ *
+ */
+trait ActiveSpan extends Span {
+
+ /**
+ * Sets the currently active Span to whatever Span was active when this Span was activated.
+ *
+ */
+ def deactivate(): Unit
+}
+
+object ActiveSpan {
+
+ final class Default(wrappedSpan: Span, restoreOnDeactivate: ActiveSpan, tl: ThreadLocal[ActiveSpan])
+ extends ActiveSpan {
+
+ override def deactivate(): Unit =
+ tl.set(restoreOnDeactivate)
+
+ //
+ // Forward all other members to the wrapped Span.
+ //
+
+ override def annotate(annotation: Span.Annotation): Span =
+ wrappedSpan.annotate(annotation)
+
+ override def addSpanTag(key: String, value: String): Span =
+ wrappedSpan.addSpanTag(key, value)
+
+ override def addMetricTag(key: String, value: String): Span =
+ wrappedSpan.addMetricTag(key, value)
+
+ override def addBaggage(key: String, value: String): Span =
+ wrappedSpan.addBaggage(key, value)
+
+ override def getBaggage(key: String): Option[String] =
+ wrappedSpan.getBaggage(key)
+
+ override def disableMetricsCollection(): Span =
+ wrappedSpan.disableMetricsCollection()
+
+ override def context(): SpanContext =
+ wrappedSpan.context()
+
+ override def setOperationName(operationName: String): Span =
+ wrappedSpan.setOperationName(operationName)
+
+ override def finish(): Unit =
+ wrappedSpan.finish()
+
+ override def finish(finishMicros: Long): Unit =
+ wrappedSpan.finish(finishMicros)
+
+ override def capture(): Continuation =
+ wrappedSpan.capture()
+ }
+
+ object Default {
+ def apply(wrappedSpan: Span, restoreOnDeactivate: ActiveSpan, tl: ThreadLocal[ActiveSpan]): Default =
+ new Default(wrappedSpan, restoreOnDeactivate, tl)
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Continuation.scala b/kamon-core/src/main/scala/kamon/trace/Continuation.scala
new file mode 100644
index 00000000..72d77597
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/Continuation.scala
@@ -0,0 +1,39 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+
+
+trait Continuation {
+ def activate(): ActiveSpan
+}
+
+object Continuation {
+
+ /**
+ *
+ * @param span
+ * @param tracer
+ */
+ final class Default(span: Span, tracer: Tracer) extends Continuation {
+ override def activate(): ActiveSpan =
+ tracer.makeActive(span)
+ }
+
+ object Default {
+ def apply(span: Span, tracer: Tracer): Default = new Default(span, tracer)
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala b/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala
new file mode 100644
index 00000000..ea23227a
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala
@@ -0,0 +1,57 @@
+package kamon.trace
+
+import java.nio.ByteBuffer
+import java.util.concurrent.ThreadLocalRandom
+
+import kamon.util.HexCodec
+
+import scala.util.Try
+
+trait IdentityProvider {
+ def traceIdentifierGenerator(): IdentityProvider.Generator
+ def spanIdentifierGenerator(): IdentityProvider.Generator
+}
+
+object IdentityProvider {
+ case class Identifier(string: String, bytes: Array[Byte])
+
+ val NoIdentifier = Identifier("", new Array[Byte](0))
+
+
+ trait Generator {
+ def generate(): Identifier
+ def from(string: String): Identifier
+ def from(bytes: Array[Byte]): Identifier
+ }
+
+
+ class Default extends IdentityProvider {
+ private val generator = new Generator {
+ override def generate(): Identifier = {
+ val data = ByteBuffer.wrap(new Array[Byte](8))
+ val random = ThreadLocalRandom.current().nextLong()
+ data.putLong(random)
+
+ Identifier(HexCodec.toLowerHex(random), data.array())
+ }
+
+ override def from(string: String): Identifier = Try {
+ val identifierLong = HexCodec.lowerHexToUnsignedLong(string)
+ val data = ByteBuffer.allocate(8)
+ data.putLong(identifierLong)
+
+ Identifier(string, data.array())
+ } getOrElse(IdentityProvider.NoIdentifier)
+
+ override def from(bytes: Array[Byte]): Identifier = Try {
+ val buffer = ByteBuffer.wrap(bytes)
+ val identifierLong = buffer.getLong
+
+ Identifier(HexCodec.toLowerHex(identifierLong), bytes)
+ } getOrElse(IdentityProvider.NoIdentifier)
+ }
+
+ override def traceIdentifierGenerator(): Generator = generator
+ override def spanIdentifierGenerator(): Generator = generator
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
index 0347a151..f478bbd1 100644
--- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -15,13 +15,16 @@
package kamon.trace
+import java.util.concurrent.ThreadLocalRandom
+import kamon.trace.SpanContext.SamplingDecision
+
trait Sampler {
- def decide(spanID: Long): Boolean
+ def decide(operationName: String, builderTags: Map[String, String]): SamplingDecision
}
object Sampler {
- val always = new Constant(true)
- val never = new Constant(false)
+ val always = new Constant(SamplingDecision.Sample)
+ val never = new Constant(SamplingDecision.DoNotSample)
def random(chance: Double): Sampler = {
assert(chance >= 0D && chance <= 1.0D, "Change should be >= 0 and <= 1.0")
@@ -33,8 +36,8 @@ object Sampler {
}
}
- class Constant(decision: Boolean) extends Sampler {
- override def decide(spanID: Long): Boolean = decision
+ class Constant(decision: SamplingDecision) extends Sampler {
+ override def decide(operationName: String, builderTags: Map[String, String]): SamplingDecision = decision
override def toString: String =
s"Sampler.Constant(decision = $decision)"
@@ -44,8 +47,10 @@ object Sampler {
val upperBoundary = Long.MaxValue * chance
val lowerBoundary = -upperBoundary
- override def decide(spanID: Long): Boolean =
- spanID >= lowerBoundary && spanID <= upperBoundary
+ override def decide(operationName: String, builderTags: Map[String, String]): SamplingDecision = {
+ val random = ThreadLocalRandom.current().nextLong()
+ if(random >= lowerBoundary && random <= upperBoundary) SamplingDecision.Sample else SamplingDecision.DoNotSample
+ }
override def toString: String =
s"Sampler.Random(chance = $chance)"
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 8149be74..a113b9bc 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -16,155 +16,178 @@
package kamon
package trace
+import kamon.trace.SpanContext.SamplingDecision
import scala.collection.JavaConverters._
import kamon.util.{Clock, MeasurementUnit}
-class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long,
- reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
+trait Span extends BaseSpan {
- private var isOpen: Boolean = true
- private val sampled: Boolean = spanContext.sampled
- private var operationName: String = initialOperationName
- private var endTimestampMicros: Long = 0
+ def annotate(name: String): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty))
- private var tags = initialTags
- private var logs = List.empty[Span.LogEntry]
- private var additionalMetricTags = Map.empty[String, String]
+ def annotate(name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, fields))
+ def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(timestampMicroseconds, name, fields))
- override def log(fields: java.util.Map[String, _]): Span =
- log(fields.asScala.asInstanceOf[Map[String, _]])
- def log(fields: Map[String, _]): Span = synchronized {
- if (sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs
- this
- }
+}
- def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, fields) :: logs
- this
- }
+trait BaseSpan {
- override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span =
- log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]])
+ def context(): SpanContext
- override def log(event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs
- this
- }
+ def capture(): Continuation
- override def log(timestampMicroseconds: Long, event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs
- this
- }
+ def annotate(annotation: Span.Annotation): Span
- override def log(eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs
- this
- }
+ def addSpanTag(key: String, value: String): Span
+
+ def addMetricTag(key: String, value: String): Span
+
+ def addBaggage(key: String, value: String): Span
+
+ def getBaggage(key: String): Option[String]
+
+ def setOperationName(name: String): Span
- override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs
- this
+ def disableMetricsCollection(): Span
+
+ def finish(): Unit
+
+ def finish(finishTimestampMicros: Long): Unit
+
+}
+
+object Span {
+
+ final class Empty(tracer: Tracer) extends Span {
+ override val context: SpanContext = SpanContext.EmptySpanContext
+ override def capture(): Continuation = Continuation.Default(this, tracer)
+
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def addBaggage(key: String, value: String): Span = this
+ override def getBaggage(key: String): Option[String] = None
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(): Unit = {}
+ override def finish(finishTimestampMicros: Long): Unit = {}
}
- override def getBaggageItem(key: String): String =
- spanContext.getBaggage(key)
+ object Empty {
+ def apply(tracer: Tracer): Empty = new Empty(tracer)
+ }
- override def context(): SpanContext =
- spanContext
+ /**
+ *
+ * @param spanContext
+ * @param initialOperationName
+ * @param initialTags
+ * @param startTimestampMicros
+ * @param reporterRegistry
+ */
+ final class Real(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String],
+ startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer) extends Span {
+
+ private var collectMetrics: Boolean = true
+ private var isOpen: Boolean = true
+ private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample
+ private var operationName: String = initialOperationName
+ private var endTimestampMicros: Long = 0
+
+ private var spanTags = initialTags
+ private var customMetricTags = Map.empty[String, String]
+ private var annotations = List.empty[Span.Annotation]
+
+ def annotate(annotation: Annotation): Span = synchronized {
+ if(sampled && isOpen)
+ annotations = annotation :: annotations
+ this
+ }
- override def setTag(key: String, value: String): Span = synchronized {
- if (isOpen) {
- extractMetricTag(key, value)
- if(sampled)
- tags = tags ++ Map(key -> value)
+ override def addSpanTag(key: String, value: String): Span = synchronized {
+ if(sampled && isOpen)
+ spanTags = spanTags + (key -> value)
+ this
}
- this
- }
- override def setTag(key: String, value: Boolean): Span = {
- if (isOpen) {
- val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addMetricTag(key: String, value: String): Span = synchronized {
+ if(sampled && isOpen && collectMetrics)
+ customMetricTags = customMetricTags + (key -> value)
+ this
}
- this
- }
- override def setTag(key: String, value: Number): Span = {
- if (isOpen) {
- val tagValue = String.valueOf(value)
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addBaggage(key: String, value: String): Span = {
+ spanContext.baggage.add(key, value)
+ this
}
- this
- }
- def setMetricTag(key: String, value: String): Span = synchronized {
- if (isOpen)
- additionalMetricTags = additionalMetricTags ++ Map(key -> value)
- this
- }
+ override def getBaggage(key: String): Option[String] =
+ spanContext.baggage.get(key)
- override def setBaggageItem(key: String, value: String): Span = synchronized {
- if (isOpen)
- spanContext.addBaggageItem(key, value)
- this
- }
+ override def disableMetricsCollection(): Span = synchronized {
+ collectMetrics = false
+ this
+ }
- override def setOperationName(operationName: String): Span = synchronized {
- if(isOpen)
- this.operationName = operationName
- this
- }
+ override def context(): SpanContext =
+ spanContext
+
+ override def setOperationName(operationName: String): Span = synchronized {
+ if(isOpen)
+ this.operationName = operationName
+ this
+ }
- private def extractMetricTag(tag: String, value: String): Unit =
- if(tag.startsWith(Span.MetricTagPrefix))
- additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
+ override def finish(): Unit =
+ finish(Clock.microTimestamp())
- override def finish(): Unit =
- finish(Clock.microTimestamp())
+ override def finish(finishMicros: Long): Unit = synchronized {
+ if (isOpen) {
+ isOpen = false
+ endTimestampMicros = finishMicros
- override def finish(finishMicros: Long): Unit = synchronized {
- if (isOpen) {
- isOpen = false
- endTimestampMicros = finishMicros
- recordSpanMetrics()
+ if(collectMetrics)
+ recordSpanMetrics()
- if(sampled)
- reporterRegistry.reportSpan(completedSpan)
+ if(sampled)
+ reporterRegistry.reportSpan(completedSpan)
+ }
}
- }
- private def completedSpan: Span.CompletedSpan =
- Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs)
+ override def capture(): Continuation =
+ Continuation.Default(this, tracer)
+
+ private def completedSpan: Span.FinishedSpan =
+ Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations)
- private def recordSpanMetrics(): Unit = {
- val elapsedTime = endTimestampMicros - startTimestampMicros
- val metricTags = Map("operation" -> operationName) ++ additionalMetricTags
+ private def recordSpanMetrics(): Unit = {
+ val elapsedTime = endTimestampMicros - startTimestampMicros
+ val metricTags = Map("operation" -> operationName) ++ customMetricTags
- val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags)
- latencyHistogram.record(elapsedTime)
+ val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(metricTags)
+ latencyHistogram.record(elapsedTime)
- tags.get("error").foreach { errorTag =>
- if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) {
- Span.Metrics.SpanErrorCount.refine(metricTags).increment()
+ spanTags.get("error").foreach { errorTag =>
+ if(errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)) {
+ Span.Metrics.SpanErrorCount.refine(metricTags).increment()
+ }
}
}
}
-}
-object Span {
+ object Real {
+ def apply(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String],
+ startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer): Real =
+ new Real(spanContext, initialOperationName, initialTags, startTimestampMicros, reporterRegistry, tracer)
+ }
+
+
+
object Metrics {
val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds)
val SpanErrorCount = Kamon.counter("span.error-count")
@@ -174,14 +197,14 @@ object Span {
val BooleanTagTrueValue = "1"
val BooleanTagFalseValue = "0"
- case class LogEntry(timestamp: Long, fields: Map[String, _])
+ case class Annotation(timestamp: Long, name: String, fields: Map[String, String])
- case class CompletedSpan(
+ case class FinishedSpan(
context: SpanContext,
operationName: String,
startTimestampMicros: Long,
endTimestampMicros: Long,
tags: Map[String, String],
- logs: Seq[LogEntry]
+ annotations: Seq[Annotation]
)
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
index b37e208b..ae92f46d 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
@@ -15,26 +15,92 @@
package kamon.trace
-import java.lang
-import java.util.{Map => JavaMap}
+import kamon.trace.IdentityProvider.Identifier
+import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source}
-import scala.collection.JavaConverters._
+/**
+ *
+ * @param traceID
+ * @param spanID
+ * @param parentID
+ * @param samplingDecision
+ * @param baggage
+ */
+case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision, baggage: Baggage, source: Source) {
-class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long, val sampled: Boolean,
- private var baggage: Map[String, String]) extends io.opentracing.SpanContext {
+ def createChild(childSpanID: Identifier, samplingDecision: SamplingDecision): SpanContext =
+ this.copy(parentID = this.spanID, spanID = childSpanID)
+}
+
+object SpanContext {
- private[kamon] def addBaggageItem(key: String, value: String): Unit = synchronized {
- baggage = baggage + (key -> value)
+ sealed trait Source
+ object Source {
+ case object Local extends Source
+ case object Remote extends Source
}
- private[kamon] def getBaggage(key: String): String = synchronized {
- baggage.get(key).getOrElse(null)
+ val EmptySpanContext = SpanContext(
+ traceID = IdentityProvider.NoIdentifier,
+ spanID = IdentityProvider.NoIdentifier,
+ parentID = IdentityProvider.NoIdentifier,
+ samplingDecision = SamplingDecision.DoNotSample,
+ baggage = Baggage.EmptyBaggage,
+ source = Source.Local
+ )
+
+
+ sealed trait SamplingDecision
+ object SamplingDecision {
+
+ /**
+ * The Trace is sampled, all child Spans should be sampled as well.
+ */
+ case object Sample extends SamplingDecision
+
+ /**
+ * The Trace is not sampled, none of the child Spans should be sampled.
+ */
+ case object DoNotSample extends SamplingDecision
+
+ /**
+ * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span.
+ */
+ case object Unknown extends SamplingDecision
}
- private[kamon] def baggageMap: Map[String, String] =
- baggage
+ /**
+ *
+ */
+
+ sealed trait Baggage {
+ def add(key: String, value:String): Unit
+ def get(key: String): Option[String]
+ def getAll(): Map[String, String]
+ }
+
+ object Baggage {
+ def apply(): Baggage = new DefaultBaggage()
+
+ case object EmptyBaggage extends Baggage {
+ override def add(key: String, value: String): Unit = {}
+ override def get(key: String): Option[String] = None
+ override def getAll: Map[String, String] = Map.empty
+ }
+
+
+ final class DefaultBaggage extends Baggage {
+ private var baggage: Map[String, String] = Map.empty
+
+ def add(key: String, value: String): Unit = synchronized {
+ baggage = baggage + (key -> value)
+ }
+
+ def get(key: String): Option[String] =
+ baggage.get(key)
- override def baggageItems(): lang.Iterable[JavaMap.Entry[String, String]] = synchronized {
- baggage.asJava.entrySet()
+ def getAll: Map[String, String] =
+ baggage
+ }
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
index 8e3a446b..23eb40db 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
@@ -17,82 +17,137 @@
package kamon.trace
import java.net.{URLDecoder, URLEncoder}
+import java.nio.ByteBuffer
import java.util.concurrent.ThreadLocalRandom
+import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source}
+
import scala.collection.JavaConverters._
-import io.opentracing.propagation.TextMap
import kamon.util.HexCodec
trait SpanContextCodec[T] {
def inject(spanContext: SpanContext, carrier: T): Unit
- def extract(carrier: T, sampler: Sampler): SpanContext
+ def extract(carrier: T): Option[SpanContext]
+}
+
+trait TextMap {
+ def get(key: String): Option[String]
+ def put(key: String, value: String): Unit
+ def values: Iterator[(String, String)]
}
+
object SpanContextCodec {
- val TextMap: SpanContextCodec[TextMap] = new TextMapSpanCodec(
- traceIDKey = "TRACE_ID",
- parentIDKey = "PARENT_ID",
- spanIDKey = "SPAN_ID",
- sampledKey = "SAMPLED",
- baggagePrefix = "BAGGAGE_",
- baggageValueEncoder = identity,
- baggageValueDecoder = identity
- )
-
- val ZipkinB3: SpanContextCodec[TextMap] = new TextMapSpanCodec(
- traceIDKey = "X-B3-TraceId",
- parentIDKey = "X-B3-ParentSpanId",
- spanIDKey = "X-B3-SpanId",
- sampledKey = "X-B3-Sampled",
- baggagePrefix = "X-B3-Baggage-",
- baggageValueEncoder = urlEncode,
- baggageValueDecoder = urlDecode
- )
+ trait Format[C]
+ object Format {
+ case object TextMap extends Format[TextMap]
+ case object HttpHeaders extends Format[TextMap]
+ case object Binary extends Format[ByteBuffer]
+ }
+
+// val ExtendedB3: SpanContextCodec[TextMap] = new TextMapSpanCodec(
+// traceIDKey = "X-B3-TraceId",
+// parentIDKey = "X-B3-ParentSpanId",
+// spanIDKey = "X-B3-SpanId",
+// sampledKey = "X-B3-Sampled",
+// baggageKey = "X-Kamon-Baggage-",
+// baggageValueEncoder = urlEncode,
+// baggageValueDecoder = urlDecode
+// )
private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8")
private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8")
- private class TextMapSpanCodec(traceIDKey: String, parentIDKey: String, spanIDKey: String, sampledKey: String, baggagePrefix: String,
- baggageValueEncoder: String => String, baggageValueDecoder: String => String) extends SpanContextCodec[TextMap] {
+ private class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] {
+ import ExtendedB3.Headers
+
override def inject(spanContext: SpanContext, carrier: TextMap): Unit = {
- carrier.put(traceIDKey, encodeLong(spanContext.traceID))
- carrier.put(parentIDKey, encodeLong(spanContext.parentID))
- carrier.put(spanIDKey, encodeLong(spanContext.spanID))
+ carrier.put(Headers.TraceIdentifier, baggageValueEncoder(spanContext.traceID.string))
+ carrier.put(parentIDKey, baggageValueEncoder(spanContext.parentID.string))
+ carrier.put(spanIDKey, baggageValueEncoder(spanContext.spanID.string))
- spanContext.baggageItems().iterator().asScala.foreach { entry =>
- carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue))
+ spanContext.baggage.getAll().foreach {
+ case (key, value) => carrier.put(baggageKey + key, baggageValueEncoder(value))
}
}
- override def extract(carrier: TextMap, sampler: Sampler): SpanContext = {
- var traceID: String = null
- var parentID: String = null
- var spanID: String = null
- var sampled: String = null
- var baggage: Map[String, String] = Map.empty
-
- carrier.iterator().asScala.foreach { entry =>
- if(entry.getKey.equals(traceIDKey))
- traceID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(parentIDKey))
- parentID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(spanIDKey))
- spanID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(sampledKey))
- sampled = entry.getValue
- else if(entry.getKey.startsWith(baggagePrefix))
- baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue))
- }
+ override def extract(carrier: TextMap): Option[SpanContext] = {
+ val traceID = carrier.get(Headers.TraceIdentifier)
+ .map(identityProvider.traceIdentifierGenerator().from)
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ val spanID = carrier.get(Headers.SpanIdentifier)
+ .map(identityProvider.spanIdentifierGenerator().from)
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
+ val parentID = carrier.get(Headers.ParentSpanIdentifier)
+ .map(identityProvider.spanIdentifierGenerator().from)
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ val samplingDecision = carrier.get(Headers.Flags).orElse(carrier.get(Headers.Sampled)) match {
+ case Some(sampled) if sampled == "1" => SamplingDecision.Sample
+ case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
+ case _ => SamplingDecision.Unknown
+ }
+
+
+
+
+ Some(SpanContext(traceID, spanID, parentID, samplingDecision, ???, Source.Remote))
+
+ } else None
+
+ val minimalSpanContext =
+ for {
+ traceID <- carrier.get(traceIDKey).map(identityProvider.traceIdentifierGenerator().from)
+ spanID <- carrier.get(spanIDKey).map(identityProvider.spanIdentifierGenerator().from)
+ } yield {
+
+
+ }
+
+
+
+// var traceID: String = null
+// var parentID: String = null
+// var spanID: String = null
+// var sampled: String = null
+// var baggage: Map[String, String] = Map.empty
+//
+// carrier.iterator().asScala.foreach { entry =>
+// if(entry.getKey.equals(traceIDKey))
+// traceID = baggageValueDecoder(entry.getValue)
+// else if(entry.getKey.equals(parentIDKey))
+// parentID = baggageValueDecoder(entry.getValue)
+// else if(entry.getKey.equals(spanIDKey))
+// spanID = baggageValueDecoder(entry.getValue)
+// else if(entry.getKey.equals(sampledKey))
+// sampled = entry.getValue
+// else if(entry.getKey.startsWith(baggagePrefix))
+// baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue))
+// }
+//
+// if(traceID != null && spanID != null) {
+// val actualParent = if(parentID == null) 0L else decodeLong(parentID)
+// val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1")
+//
+// new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage)
+// } else null
+//
+// None
+ }
- if(traceID != null && spanID != null) {
- val actualParent = if(parentID == null) 0L else decodeLong(parentID)
- val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1")
+ private def encodeBaggage(baggage: Baggage): String = {
+ if(baggage.getAll().nonEmpty) {
- new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage)
- } else null
+ baggage.getAll().foreach {
+ case (key, value) =>
+ }
+ } else ""
}
private def decodeLong(input: String): Long =
@@ -102,4 +157,15 @@ object SpanContextCodec {
HexCodec.toLowerHex(input)
}
+
+ object ExtendedB3 {
+ object Headers {
+ val TraceIdentifier = "X-B3-TraceId"
+ val ParentSpanIdentifier = "X-B3-ParentSpanId"
+ val SpanIdentifier = "X-B3-SpanId"
+ val Sampled = "X-B3-Sampled"
+ val Flags = "X-B3-Flags"
+ val Baggage = "X-B3-Extra-Baggage"
+ }
+ }
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 19067f5e..1aec8d7c 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -16,148 +16,173 @@
package kamon.trace
-import java.util.concurrent.ThreadLocalRandom
+import java.nio.ByteBuffer
import com.typesafe.config.Config
-import io.opentracing.propagation.{Format, TextMap}
-import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP}
-import io.opentracing.util.ThreadLocalActiveSpanSource
import kamon.ReporterRegistryImpl
import kamon.metric.MetricLookup
+import kamon.trace.SpanContext.{SamplingDecision, Source}
+import kamon.trace.Tracer.SpanBuilder
import kamon.util.Clock
import org.slf4j.LoggerFactory
-class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config)
- extends ThreadLocalActiveSpanSource with io.opentracing.Tracer {
+trait Tracer {
+ def buildSpan(operationName: String): SpanBuilder
+ def activeSpan(): ActiveSpan
+ def makeActive(span: Span): ActiveSpan
- private val logger = LoggerFactory.getLogger(classOf[Tracer])
- private val tracerMetrics = new TracerMetrics(metrics)
+ def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext]
+ def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit
- @volatile private var configuredSampler: Sampler = Sampler.never
- @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap
- @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3
- reconfigure(initialConfig)
- override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder =
- new SpanBuilder(operationName)
+ //
+ // Configuration Utilities
+ //
- override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match {
- case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
- case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
- case BINARY => null // TODO: Implement Binary Encoding
- case _ => null
- }
+ def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit
+ def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit
+}
- override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match {
- case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap])
- case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap])
- case BINARY =>
- case _ =>
- }
+object Tracer {
- def sampler: Sampler =
- configuredSampler
+ final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer {
+ private val logger = LoggerFactory.getLogger(classOf[Tracer])
+ private val emptySpan = Span.Empty(this)
+ private val activeSpanStorage: ThreadLocal[ActiveSpan] = new ThreadLocal[ActiveSpan] {
+ override def initialValue(): ActiveSpan = ActiveSpan.Default(emptySpan, null, activeSpanStorage)
+ }
- def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.textMapSpanContextCodec = codec
+ private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
+ @volatile private[Tracer] var joinRemoteSpansWithSameID: Boolean = false
+ @volatile private[Tracer] var configuredSampler: Sampler = Sampler.never
+ @volatile private[Tracer] var idGenerator: IdentifierGenerator = IdentifierGenerator.RandomLong()
+ @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.TextMap
+ @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ZipkinB3
- def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.httpHeaderSpanContextCodec = codec
+ reconfigure(initialConfig)
- private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder {
- private var parentContext: SpanContext = _
- private var startTimestamp = 0L
- private var initialTags = Map.empty[String, String]
- private var useActiveSpanAsParent = true
+ def buildSpan(operationName: String): SpanBuilder =
+ new SpanBuilder(operationName, this, reporterRegistry)
+
+ def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match {
+ case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap])
+ case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap])
+ case SpanContextCodec.Format.Binary => None
+ case _ => None
+ }
- override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match {
- case spanContext: kamon.trace.SpanContext =>
- this.parentContext = spanContext
- this
- case null => this
- case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this
+ def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit = format match {
+ case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap])
+ case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap])
+ case SpanContextCodec.Format.Binary =>
+ case _ =>
}
- override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder =
- asChildOf(parent.context())
+ def activeSpan(): ActiveSpan =
+ activeSpanStorage.get()
- override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = {
- if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) {
- asChildOf(referencedContext)
- } else this
+ def makeActive(span: Span): ActiveSpan = {
+ val currentlyActiveSpan = activeSpanStorage.get()
+ val newActiveSpan = ActiveSpan.Default(span, currentlyActiveSpan, activeSpanStorage)
+ activeSpanStorage.set(newActiveSpan)
+ newActiveSpan
}
- override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value)
- this
+ def sampler: Sampler =
+ configuredSampler
+
+ def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
+ this.textMapSpanContextCodec = codec
+
+ def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
+ this.httpHeaderSpanContextCodec = codec
+
+
+ private[kamon] def reconfigure(config: Config): Unit = synchronized {
+ val traceConfig = config.getConfig("kamon.trace")
+
+ configuredSampler = traceConfig.getString("sampler") match {
+ case "always" => Sampler.always
+ case "never" => Sampler.never
+ case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance"))
+ case other => sys.error(s"Unexpected sampler name $other.")
+ }
+ }
+
+ private final class TracerMetrics(metricLookup: MetricLookup) {
+ val createdSpans = metricLookup.counter("tracer.spans-created")
}
+ }
+
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) {
+ private var parentContext: SpanContext = _
+ private var startTimestamp = 0L
+ private var initialTags = Map.empty[String, String]
+ private var useActiveSpanAsParent = true
- override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value.toString)
+ def asChildOf(parentContext: SpanContext): SpanBuilder = {
+ this.parentContext = parentContext
this
}
- override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value.toString)
+ def asChildOf(parentSpan: Span): SpanBuilder =
+ asChildOf(parentSpan.context())
+
+ def withSpanTag(key: String, value: String): SpanBuilder = {
+ this.initialTags = this.initialTags + (key -> value)
this
}
- override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = {
+ def withStartTimestamp(microseconds: Long): SpanBuilder = {
this.startTimestamp = microseconds
this
}
- override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = {
+ def ignoreActiveSpan(): SpanBuilder = {
this.useActiveSpanAsParent = false
this
}
- override def start(): io.opentracing.Span =
- startManual()
-
- override def startActive(): io.opentracing.ActiveSpan =
- makeActive(startManual())
-
- override def startManual(): Span = {
+ def start(): Span = {
val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp()
- if(parentContext == null && useActiveSpanAsParent) {
- val possibleParent = activeSpan()
- if(possibleParent != null)
- parentContext = possibleParent.context().asInstanceOf[SpanContext]
- }
+ val parentSpanContext: Option[SpanContext] = Option(parentContext)
+ .orElse(if(useActiveSpanAsParent) Some(tracer.activeSpan().context()) else None)
+ .filter(spanContext => spanContext != SpanContext.EmptySpanContext)
- val spanContext =
- if(parentContext != null)
- new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, parentContext.baggageMap)
- else {
- val traceID = createID()
- new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), Map.empty)
- }
+ val samplingDecision: SamplingDecision = parentSpanContext
+ .map(_.samplingDecision)
+ .filter(_ != SamplingDecision.Unknown)
+ .getOrElse(tracer.sampler.decide(operationName, initialTags))
- tracerMetrics.createdSpans.increment()
- new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry)
- }
-
- private def createID(): Long =
- ThreadLocalRandom.current().nextLong()
- }
-
-
- private[kamon] def reconfigure(config: Config): Unit = synchronized {
- val traceConfig = config.getConfig("kamon.trace")
+ val spanContext = parentSpanContext match {
+ case Some(parent) => joinParentContext(parent, samplingDecision)
+ case None => newSpanContext(samplingDecision)
+ }
- configuredSampler = traceConfig.getString("sampler") match {
- case "always" => Sampler.always
- case "never" => Sampler.never
- case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance"))
- case other => sys.error(s"Unexpected sampler name $other.")
+ tracer.tracerMetrics.createdSpans.increment()
+ Span.Real(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry, tracer)
}
- }
- private final class TracerMetrics(metricLookup: MetricLookup) {
- val createdSpans = metricLookup.counter("tracer.spans-created")
+ private def joinParentContext(parent: SpanContext, samplingDecision: SamplingDecision): SpanContext =
+ if(parent.source == Source.Remote && tracer.joinRemoteSpansWithSameID)
+ parent.copy(samplingDecision = samplingDecision)
+ else
+ parent.createChild(tracer.idGenerator.generateSpanID(), samplingDecision)
+
+ private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
+ SpanContext(
+ traceID = tracer.idGenerator.generateTraceID(),
+ spanID = tracer.idGenerator.generateSpanID(),
+ parentID = tracer.idGenerator.generateEmptyID(),
+ samplingDecision = samplingDecision,
+ baggage = SpanContext.Baggage(),
+ source = Source.Local
+ )
+
+ def startActive(): ActiveSpan =
+ tracer.makeActive(start())
}
}
diff --git a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala
index 885a73d9..83027cc5 100644
--- a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala
+++ b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala
@@ -22,51 +22,51 @@ import kamon.Kamon
import org.slf4j.MDC
import scala.collection.JavaConverters._
-
-object BaggageOnMDC {
-
- /**
- * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys
- * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well.
- *
- */
- def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = {
- val activeSpan = Kamon.activeSpan()
- if(activeSpan == null)
- code
- else {
- val baggageItems = activeSpan.context().baggageItems().asScala
- baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue))
- if(includeTraceID)
- addTraceIDToMDC(activeSpan.context())
-
- val evaluatedCode = code
-
- baggageItems.foreach(entry => MDC.remove(entry.getKey))
- if(includeTraceID)
- removeTraceIDFromMDC()
-
- evaluatedCode
-
- }
- }
-
- def withBaggageOnMDC[T](code: Supplier[T]): T =
- withBaggageOnMDC(true, code.get())
-
- def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T =
- withBaggageOnMDC(includeTraceID, code.get())
-
- def withBaggageOnMDC[T](code: => T): T =
- withBaggageOnMDC(true, code)
-
- private val TraceIDKey = "trace_id"
-
- private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match {
- case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID))
- case _ =>
- }
-
- private def removeTraceIDFromMDC(): Unit =
- MDC.remove(TraceIDKey)
-}
+//
+//object BaggageOnMDC {
+//
+// /**
+// * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys
+// * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well.
+// *
+// */
+// def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = {
+// val activeSpan = Kamon.activeSpan()
+// if(activeSpan == null)
+// code
+// else {
+// val baggageItems = activeSpan.context().baggageItems().asScala
+// baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue))
+// if(includeTraceID)
+// addTraceIDToMDC(activeSpan.context())
+//
+// val evaluatedCode = code
+//
+// baggageItems.foreach(entry => MDC.remove(entry.getKey))
+// if(includeTraceID)
+// removeTraceIDFromMDC()
+//
+// evaluatedCode
+//
+// }
+// }
+//
+// def withBaggageOnMDC[T](code: Supplier[T]): T =
+// withBaggageOnMDC(true, code.get())
+//
+// def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T =
+// withBaggageOnMDC(includeTraceID, code.get())
+//
+// def withBaggageOnMDC[T](code: => T): T =
+// withBaggageOnMDC(true, code)
+//
+// private val TraceIDKey = "trace_id"
+//
+// private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match {
+// case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID))
+// case _ =>
+// }
+//
+// private def removeTraceIDFromMDC(): Unit =
+// MDC.remove(TraceIDKey)
+//}
diff --git a/kamon-core/src/main/scala/kamon/util/Mixin.scala b/kamon-core/src/main/scala/kamon/util/Mixin.scala
index 348b34f1..318679c1 100644
--- a/kamon-core/src/main/scala/kamon/util/Mixin.scala
+++ b/kamon-core/src/main/scala/kamon/util/Mixin.scala
@@ -16,8 +16,8 @@
package kamon
package util
-import io.opentracing.ActiveSpan
-import io.opentracing.ActiveSpan.Continuation
+import kamon.trace.{ActiveSpan, Continuation}
+
/**
* Utility trait that marks objects carrying an ActiveSpan.Continuation.