aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-08-15 00:06:26 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-08-15 00:06:26 +0200
commit6721d325d018756296213ac8f9129bc304a21afb (patch)
treee08a5ce92802f521be228beae0ddb4ef258d0066 /kamon-core/src/main/scala/kamon/trace
parentac3b72e27765ceec4cc3958b0fa7eaba0299f017 (diff)
parenta949c875684d78818224cd2ca7aaf79aa7878724 (diff)
downloadKamon-6721d325d018756296213ac8f9129bc304a21afb.tar.gz
Kamon-6721d325d018756296213ac8f9129bc304a21afb.tar.bz2
Kamon-6721d325d018756296213ac8f9129bc304a21afb.zip
Merge remote-tracking branch 'ivantopo/wip/moving-ot-support-to-a-separeate-project' into kamon-1.0-develop
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala106
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Sampler.scala35
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala290
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala99
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContext.scala57
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala105
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala196
7 files changed, 535 insertions, 353 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala
new file mode 100644
index 00000000..937200f5
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala
@@ -0,0 +1,106 @@
+package kamon.trace
+
+import java.nio.ByteBuffer
+import java.util.concurrent.ThreadLocalRandom
+
+import kamon.util.HexCodec
+
+import scala.util.Try
+
+trait IdentityProvider {
+ def traceIdGenerator(): IdentityProvider.Generator
+ def spanIdGenerator(): IdentityProvider.Generator
+}
+
+object IdentityProvider {
+ case class Identifier(string: String, bytes: Array[Byte]) {
+
+ override def equals(obj: Any): Boolean = {
+ if(obj != null && obj.isInstanceOf[Identifier])
+ obj.asInstanceOf[Identifier].string == string
+ else false
+ }
+ }
+
+ val NoIdentifier = Identifier("", new Array[Byte](0))
+
+ trait Generator {
+ def generate(): Identifier
+ def from(string: String): Identifier
+ def from(bytes: Array[Byte]): Identifier
+ }
+
+
+ class Default extends IdentityProvider {
+ protected val longGenerator = new Generator {
+ override def generate(): Identifier = {
+ val data = ByteBuffer.wrap(new Array[Byte](8))
+ val random = ThreadLocalRandom.current().nextLong()
+ data.putLong(random)
+
+ Identifier(HexCodec.toLowerHex(random), data.array())
+ }
+
+ override def from(string: String): Identifier = Try {
+ val identifierLong = HexCodec.lowerHexToUnsignedLong(string)
+ val data = ByteBuffer.allocate(8)
+ data.putLong(identifierLong)
+
+ Identifier(string, data.array())
+ } getOrElse(IdentityProvider.NoIdentifier)
+
+ override def from(bytes: Array[Byte]): Identifier = Try {
+ val buffer = ByteBuffer.wrap(bytes)
+ val identifierLong = buffer.getLong
+
+ Identifier(HexCodec.toLowerHex(identifierLong), bytes)
+ } getOrElse(IdentityProvider.NoIdentifier)
+ }
+
+ override def traceIdGenerator(): Generator = longGenerator
+ override def spanIdGenerator(): Generator = longGenerator
+ }
+
+ object Default {
+ def apply(): Default = new Default()
+ }
+
+
+ class DoubleSizeTraceID extends Default {
+ private val doubleLongGenerator = new Generator {
+ override def generate(): Identifier = {
+ val data = ByteBuffer.wrap(new Array[Byte](16))
+ val highLong = ThreadLocalRandom.current().nextLong()
+ val lowLong = ThreadLocalRandom.current().nextLong()
+ data.putLong(highLong)
+ data.putLong(lowLong)
+
+ Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), data.array())
+ }
+
+ override def from(string: String): Identifier = Try {
+ val highPart = HexCodec.lowerHexToUnsignedLong(string.substring(0, 16))
+ val lowPart = HexCodec.lowerHexToUnsignedLong(string.substring(16, 32))
+ val data = ByteBuffer.allocate(16)
+ data.putLong(highPart)
+ data.putLong(lowPart)
+
+ Identifier(string, data.array())
+ } getOrElse(IdentityProvider.NoIdentifier)
+
+ override def from(bytes: Array[Byte]): Identifier = Try {
+ val buffer = ByteBuffer.wrap(bytes)
+ val highLong = buffer.getLong
+ val lowLong = buffer.getLong
+
+ Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), bytes)
+ } getOrElse(IdentityProvider.NoIdentifier)
+ }
+
+ override def traceIdGenerator(): Generator = doubleLongGenerator
+ }
+
+ object DoubleSizeTraceID {
+ def apply(): DoubleSizeTraceID = new DoubleSizeTraceID()
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
index 0347a151..3f366175 100644
--- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala
@@ -15,39 +15,44 @@
package kamon.trace
+import java.util.concurrent.ThreadLocalRandom
+import kamon.trace.SpanContext.SamplingDecision
+
trait Sampler {
- def decide(spanID: Long): Boolean
+ def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision
}
object Sampler {
- val always = new Constant(true)
- val never = new Constant(false)
+ val Always = new Constant(SamplingDecision.Sample)
+ val Never = new Constant(SamplingDecision.DoNotSample)
- def random(chance: Double): Sampler = {
- assert(chance >= 0D && chance <= 1.0D, "Change should be >= 0 and <= 1.0")
+ def random(probability: Double): Sampler = {
+ assert(probability >= 0D && probability <= 1.0D, "The probability should be >= 0 and <= 1.0")
- chance match {
- case 0D => never
- case 1.0D => always
+ probability match {
+ case 0D => Never
+ case 1.0D => Always
case anyOther => new Random(anyOther)
}
}
- class Constant(decision: Boolean) extends Sampler {
- override def decide(spanID: Long): Boolean = decision
+ class Constant(decision: SamplingDecision) extends Sampler {
+ override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = decision
override def toString: String =
s"Sampler.Constant(decision = $decision)"
}
- class Random(chance: Double) extends Sampler {
- val upperBoundary = Long.MaxValue * chance
+ class Random(probability: Double) extends Sampler {
+ val upperBoundary = Long.MaxValue * probability
val lowerBoundary = -upperBoundary
- override def decide(spanID: Long): Boolean =
- spanID >= lowerBoundary && spanID <= upperBoundary
+ override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = {
+ val random = ThreadLocalRandom.current().nextLong()
+ if(random >= lowerBoundary && random <= upperBoundary) SamplingDecision.Sample else SamplingDecision.DoNotSample
+ }
override def toString: String =
- s"Sampler.Random(chance = $chance)"
+ s"Sampler.Random(probability = $probability)"
}
}
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 464559e3..a4424a45 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -16,176 +16,220 @@
package kamon
package trace
-
-import scala.collection.JavaConverters._
+import kamon.ReporterRegistry.SpanSink
+import kamon.context.Key
+import kamon.trace.SpanContext.SamplingDecision
import kamon.util.{Clock, MeasurementUnit}
-class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long,
- reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span {
- private var isOpen: Boolean = true
- private val sampled: Boolean = spanContext.sampled
- private var operationName: String = initialOperationName
- private var endTimestampMicros: Long = 0
+trait Span {
- private var tags = initialTags
- private var logs = List.empty[Span.LogEntry]
- private var additionalMetricTags = Map.empty[String, String]
+ def isEmpty(): Boolean
+ def isLocal(): Boolean
+ def nonEmpty(): Boolean = !isEmpty()
+ def isRemote(): Boolean = !isLocal()
- override def log(fields: java.util.Map[String, _]): Span =
- log(fields.asScala.asInstanceOf[Map[String, _]])
+ def context(): SpanContext
- def log(fields: Map[String, _]): Span = synchronized {
- if (sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs
- this
- }
+ def annotate(annotation: Span.Annotation): Span
- def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, fields) :: logs
- this
- }
+ def addSpanTag(key: String, value: String): Span
- override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span =
- log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]])
+ def addSpanTag(key: String, value: Long): Span
- override def log(event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs
- this
- }
+ def addSpanTag(key: String, value: Boolean): Span
- override def log(timestampMicroseconds: Long, event: String): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs
- this
- }
+ def addMetricTag(key: String, value: String): Span
- override def log(eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs
- this
- }
+ def setOperationName(name: String): Span
+
+ def disableMetricsCollection(): Span
+
+ def finish(finishTimestampMicros: Long): Unit
+
+ def finish(): Unit =
+ finish(Clock.microTimestamp())
+
+ def annotate(name: String): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty))
- override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized {
- if(sampled && isOpen)
- logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs
- this
+ def annotate(name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(Clock.microTimestamp(), name, fields))
+
+ def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span =
+ annotate(Span.Annotation(timestampMicroseconds, name, fields))
+
+}
+
+object Span {
+
+ val ContextKey = Key.broadcast[Span]("span", Span.Empty)
+
+ object Empty extends Span {
+ override val context: SpanContext = SpanContext.EmptySpanContext
+ override def isEmpty(): Boolean = true
+ override def isLocal(): Boolean = true
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addSpanTag(key: String, value: Long): Span = this
+ override def addSpanTag(key: String, value: Boolean): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(finishTimestampMicros: Long): Unit = {}
}
- override def getBaggageItem(key: String): String =
- spanContext.getBaggage(key)
+ /**
+ *
+ * @param spanContext
+ * @param initialOperationName
+ * @param initialSpanTags
+ * @param startTimestampMicros
+ * @param spanSink
+ */
+ final class Local(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink) extends Span {
+
+ private var collectMetrics: Boolean = true
+ private var open: Boolean = true
+ private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample
+ private var operationName: String = initialOperationName
+
+ private var spanTags: Map[String, Span.TagValue] = initialSpanTags
+ private var customMetricTags = initialMetricTags
+ private var annotations = List.empty[Span.Annotation]
+
+ override def isEmpty(): Boolean = false
+ override def isLocal(): Boolean = true
+
+ def annotate(annotation: Annotation): Span = synchronized {
+ if(sampled && open)
+ annotations = annotation :: annotations
+ this
+ }
- override def context(): SpanContext =
- spanContext
+ override def addSpanTag(key: String, value: String): Span = synchronized {
+ if(sampled && open)
+ spanTags = spanTags + (key -> TagValue.String(value))
+ this
+ }
- override def setTag(key: String, value: String): Span = synchronized {
- if (isOpen) {
- extractMetricTag(key, value)
- if(sampled)
- tags = tags ++ Map(key -> value)
+ override def addSpanTag(key: String, value: Long): Span = synchronized {
+ if(sampled && open)
+ spanTags = spanTags + (key -> TagValue.Number(value))
+ this
}
- this
- }
- override def setTag(key: String, value: Boolean): Span = {
- if (isOpen) {
- val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addSpanTag(key: String, value: Boolean): Span = synchronized {
+ if(sampled && open) {
+ val tagValue = if (value) TagValue.True else TagValue.False
+ spanTags = spanTags + (key -> tagValue)
+ }
+ this
}
- this
- }
- override def setTag(key: String, value: Number): Span = {
- if (isOpen) {
- val tagValue = String.valueOf(value)
- extractMetricTag(key, tagValue)
- if(sampled)
- tags = tags + (key -> tagValue)
+ override def addMetricTag(key: String, value: String): Span = synchronized {
+ if(sampled && open && collectMetrics)
+ customMetricTags = customMetricTags + (key -> value)
+ this
}
- this
- }
- def setMetricTag(key: String, value: String): Span = synchronized {
- if (isOpen)
- additionalMetricTags = additionalMetricTags ++ Map(key -> value)
- this
- }
+ override def disableMetricsCollection(): Span = synchronized {
+ collectMetrics = false
+ this
+ }
- override def setBaggageItem(key: String, value: String): Span = synchronized {
- if (isOpen)
- spanContext.addBaggageItem(key, value)
- this
- }
+ override def context(): SpanContext =
+ spanContext
- override def setOperationName(operationName: String): Span = synchronized {
- if(isOpen)
- this.operationName = operationName
- this
- }
+ override def setOperationName(operationName: String): Span = synchronized {
+ if(open)
+ this.operationName = operationName
+ this
+ }
- private def extractMetricTag(tag: String, value: String): Unit =
- if(tag.startsWith(Span.MetricTagPrefix))
- additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value)
+ override def finish(finishMicros: Long): Unit = synchronized {
+ if (open) {
+ open = false
- override def finish(): Unit =
- finish(Clock.microTimestamp())
+ if(collectMetrics)
+ recordSpanMetrics(finishMicros)
+
+ if(sampled)
+ spanSink.reportSpan(toFinishedSpan(finishMicros))
+ }
+ }
+
+ private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan =
+ Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations)
+
+ private def recordSpanMetrics(endTimestampMicros: Long): Unit = {
+ val elapsedTime = endTimestampMicros - startTimestampMicros
+ val metricTags = Map("operation" -> operationName) ++ customMetricTags
- override def finish(finishMicros: Long): Unit = synchronized {
- if (isOpen) {
- isOpen = false
- endTimestampMicros = finishMicros
- recordSpanMetrics()
+ val isError = spanTags.get("error").exists {
+ errorTag => errorTag != null && errorTag.equals(Span.TagValue.True)
+ }
- if(sampled)
- reporterRegistry.reportSpan(completedSpan)
+ val refinedMetricTags = if(isError)
+ metricTags + ("error" -> "true")
+ else
+ metricTags
+
+ val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedMetricTags)
+ latencyHistogram.record(elapsedTime)
}
}
- private def completedSpan: Span.CompletedSpan =
- Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs)
+ object Local {
+ def apply(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl): Local =
+ new Local(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry)
+ }
- private def recordSpanMetrics(): Unit = {
- val elapsedTime = endTimestampMicros - startTimestampMicros
- val metricTags = Map("operation" -> operationName) ++ additionalMetricTags
- val isError = tags.get("error").exists {
- errorTag => errorTag != null && errorTag.equals(Span.BooleanTagTrueValue)
- }
+ final class Remote(val context: SpanContext) extends Span {
+ override def isEmpty(): Boolean = false
+ override def isLocal(): Boolean = false
+ override def annotate(annotation: Annotation): Span = this
+ override def addSpanTag(key: String, value: String): Span = this
+ override def addSpanTag(key: String, value: Long): Span = this
+ override def addSpanTag(key: String, value: Boolean): Span = this
+ override def addMetricTag(key: String, value: String): Span = this
+ override def setOperationName(name: String): Span = this
+ override def disableMetricsCollection(): Span = this
+ override def finish(finishTimestampMicros: Long): Unit = {}
+ }
- val refinedTags = if(isError) {
- metricTags + ("error" -> Span.BooleanTagTrueValue)
- } else {
- metricTags
- }
+ object Remote {
+ def apply(spanContext: SpanContext): Remote =
+ new Remote(spanContext)
+ }
+
+ sealed trait TagValue
+ object TagValue {
+ sealed trait Boolean extends TagValue
+ case object True extends Boolean
+ case object False extends Boolean
- val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedTags)
- latencyHistogram.record(elapsedTime)
+ case class String(string: java.lang.String) extends TagValue
+ case class Number(number: Long) extends TagValue
}
-}
-object Span {
object Metrics {
val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds)
val SpanErrorCount = Kamon.counter("span.error-count")
}
- val MetricTagPrefix = "metric."
- val BooleanTagTrueValue = "1"
- val BooleanTagFalseValue = "0"
-
- case class LogEntry(timestamp: Long, fields: Map[String, _])
+ case class Annotation(timestampMicros: Long, name: String, fields: Map[String, String])
- case class CompletedSpan(
+ case class FinishedSpan(
context: SpanContext,
operationName: String,
startTimestampMicros: Long,
endTimestampMicros: Long,
- tags: Map[String, String],
- logs: Seq[LogEntry]
+ tags: Map[String, Span.TagValue],
+ annotations: Seq[Annotation]
)
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
new file mode 100644
index 00000000..e04ceb03
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -0,0 +1,99 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.trace
+
+import java.net.{URLDecoder, URLEncoder}
+
+import kamon.Kamon
+import kamon.context.{Codec, Context, TextMap}
+import kamon.trace.SpanContext.SamplingDecision
+
+
+object SpanCodec {
+
+ class B3 extends Codec.ForEntry[TextMap] {
+ import B3.Headers
+
+ override def encode(context: Context): TextMap = {
+ val span = context.get(Span.ContextKey)
+ val carrier = TextMap.Default()
+
+ if(span.nonEmpty()) {
+ val spanContext = span.context
+ carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+ carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+
+ encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
+ carrier.put(Headers.Sampled, samplingDecision)
+ }
+ }
+
+ carrier
+ }
+
+ override def decode(carrier: TextMap, context: Context): Context = {
+ val identityProvider = Kamon.tracer.identityProvider
+ val traceID = carrier.get(Headers.TraceIdentifier)
+ .map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ val spanID = carrier.get(Headers.SpanIdentifier)
+ .map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
+ val parentID = carrier.get(Headers.ParentSpanIdentifier)
+ .map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
+ .getOrElse(IdentityProvider.NoIdentifier)
+
+ val flags = carrier.get(Headers.Flags)
+
+ val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match {
+ case Some(sampled) if sampled == "1" => SamplingDecision.Sample
+ case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
+ case _ => SamplingDecision.Unknown
+ }
+
+ context.withKey(Span.ContextKey, Span.Remote(SpanContext(traceID, spanID, parentID, samplingDecision)))
+
+ } else context
+ }
+
+ private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match {
+ case SamplingDecision.Sample => Some("1")
+ case SamplingDecision.DoNotSample => Some("0")
+ case SamplingDecision.Unknown => None
+ }
+
+ private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8")
+ private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8")
+ }
+
+ object B3 {
+
+ def apply(): B3 =
+ new B3()
+
+ object Headers {
+ val TraceIdentifier = "X-B3-TraceId"
+ val ParentSpanIdentifier = "X-B3-ParentSpanId"
+ val SpanIdentifier = "X-B3-SpanId"
+ val Sampled = "X-B3-Sampled"
+ val Flags = "X-B3-Flags"
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
index b37e208b..4d013881 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala
@@ -15,26 +15,51 @@
package kamon.trace
-import java.lang
-import java.util.{Map => JavaMap}
+import kamon.trace.IdentityProvider.Identifier
+import kamon.trace.SpanContext.SamplingDecision
-import scala.collection.JavaConverters._
+/**
+ *
+ * @param traceID
+ * @param spanID
+ * @param parentID
+ * @param samplingDecision
+ */
+case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision) {
-class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long, val sampled: Boolean,
- private var baggage: Map[String, String]) extends io.opentracing.SpanContext {
+ def createChild(childSpanID: Identifier, samplingDecision: SamplingDecision): SpanContext =
+ this.copy(parentID = this.spanID, spanID = childSpanID)
+}
- private[kamon] def addBaggageItem(key: String, value: String): Unit = synchronized {
- baggage = baggage + (key -> value)
- }
+object SpanContext {
- private[kamon] def getBaggage(key: String): String = synchronized {
- baggage.get(key).getOrElse(null)
- }
+ val EmptySpanContext = SpanContext(
+ traceID = IdentityProvider.NoIdentifier,
+ spanID = IdentityProvider.NoIdentifier,
+ parentID = IdentityProvider.NoIdentifier,
+ samplingDecision = SamplingDecision.DoNotSample
+ )
+
+
+ sealed trait SamplingDecision
- private[kamon] def baggageMap: Map[String, String] =
- baggage
+ object SamplingDecision {
+
+ /**
+ * The Trace is sampled, all child Spans should be sampled as well.
+ */
+ case object Sample extends SamplingDecision
+
+ /**
+ * The Trace is not sampled, none of the child Spans should be sampled.
+ */
+ case object DoNotSample extends SamplingDecision
+
+ /**
+ * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span.
+ */
+ case object Unknown extends SamplingDecision
- override def baggageItems(): lang.Iterable[JavaMap.Entry[String, String]] = synchronized {
- baggage.asJava.entrySet()
}
-}
+
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
deleted file mode 100644
index 8e3a446b..00000000
--- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-
-package kamon.trace
-
-import java.net.{URLDecoder, URLEncoder}
-import java.util.concurrent.ThreadLocalRandom
-
-import scala.collection.JavaConverters._
-import io.opentracing.propagation.TextMap
-import kamon.util.HexCodec
-
-
-trait SpanContextCodec[T] {
- def inject(spanContext: SpanContext, carrier: T): Unit
- def extract(carrier: T, sampler: Sampler): SpanContext
-}
-
-object SpanContextCodec {
-
- val TextMap: SpanContextCodec[TextMap] = new TextMapSpanCodec(
- traceIDKey = "TRACE_ID",
- parentIDKey = "PARENT_ID",
- spanIDKey = "SPAN_ID",
- sampledKey = "SAMPLED",
- baggagePrefix = "BAGGAGE_",
- baggageValueEncoder = identity,
- baggageValueDecoder = identity
- )
-
- val ZipkinB3: SpanContextCodec[TextMap] = new TextMapSpanCodec(
- traceIDKey = "X-B3-TraceId",
- parentIDKey = "X-B3-ParentSpanId",
- spanIDKey = "X-B3-SpanId",
- sampledKey = "X-B3-Sampled",
- baggagePrefix = "X-B3-Baggage-",
- baggageValueEncoder = urlEncode,
- baggageValueDecoder = urlDecode
- )
-
- private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8")
- private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8")
-
- private class TextMapSpanCodec(traceIDKey: String, parentIDKey: String, spanIDKey: String, sampledKey: String, baggagePrefix: String,
- baggageValueEncoder: String => String, baggageValueDecoder: String => String) extends SpanContextCodec[TextMap] {
-
- override def inject(spanContext: SpanContext, carrier: TextMap): Unit = {
- carrier.put(traceIDKey, encodeLong(spanContext.traceID))
- carrier.put(parentIDKey, encodeLong(spanContext.parentID))
- carrier.put(spanIDKey, encodeLong(spanContext.spanID))
-
- spanContext.baggageItems().iterator().asScala.foreach { entry =>
- carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue))
- }
- }
-
- override def extract(carrier: TextMap, sampler: Sampler): SpanContext = {
- var traceID: String = null
- var parentID: String = null
- var spanID: String = null
- var sampled: String = null
- var baggage: Map[String, String] = Map.empty
-
- carrier.iterator().asScala.foreach { entry =>
- if(entry.getKey.equals(traceIDKey))
- traceID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(parentIDKey))
- parentID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(spanIDKey))
- spanID = baggageValueDecoder(entry.getValue)
- else if(entry.getKey.equals(sampledKey))
- sampled = entry.getValue
- else if(entry.getKey.startsWith(baggagePrefix))
- baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue))
- }
-
- if(traceID != null && spanID != null) {
- val actualParent = if(parentID == null) 0L else decodeLong(parentID)
- val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1")
-
- new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage)
- } else null
- }
-
- private def decodeLong(input: String): Long =
- HexCodec.lowerHexToUnsignedLong(input)
-
- private def encodeLong(input: Long): String =
- HexCodec.toLowerHex(input)
-
- }
-}
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 19067f5e..7d8830ca 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -13,148 +13,156 @@
* =========================================================================================
*/
-
package kamon.trace
-import java.util.concurrent.ThreadLocalRandom
-
import com.typesafe.config.Config
-import io.opentracing.propagation.{Format, TextMap}
-import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP}
-import io.opentracing.util.ThreadLocalActiveSpanSource
-import kamon.ReporterRegistryImpl
+import kamon.{Kamon, ReporterRegistryImpl}
import kamon.metric.MetricLookup
-import kamon.util.Clock
+import kamon.trace.Span.TagValue
+import kamon.trace.SpanContext.SamplingDecision
+import kamon.trace.Tracer.SpanBuilder
+import kamon.util.{Clock, DynamicAccess}
import org.slf4j.LoggerFactory
+import scala.collection.immutable
+import scala.util.Try
-class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config)
- extends ThreadLocalActiveSpanSource with io.opentracing.Tracer {
+trait Tracer {
+ def buildSpan(operationName: String): SpanBuilder
+ def identityProvider: IdentityProvider
+}
- private val logger = LoggerFactory.getLogger(classOf[Tracer])
- private val tracerMetrics = new TracerMetrics(metrics)
+object Tracer {
- @volatile private var configuredSampler: Sampler = Sampler.never
- @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap
- @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3
+ final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer {
+ private val logger = LoggerFactory.getLogger(classOf[Tracer])
- reconfigure(initialConfig)
+ private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
+ @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true
+ @volatile private[Tracer] var configuredSampler: Sampler = Sampler.Never
+ @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default()
- override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder =
- new SpanBuilder(operationName)
+ reconfigure(initialConfig)
- override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match {
- case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
- case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler)
- case BINARY => null // TODO: Implement Binary Encoding
- case _ => null
- }
+ override def buildSpan(operationName: String): SpanBuilder =
+ new SpanBuilder(operationName, this, reporterRegistry)
- override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match {
- case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap])
- case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap])
- case BINARY =>
- case _ =>
- }
+ override def identityProvider: IdentityProvider =
+ this._identityProvider
+
+ def sampler: Sampler =
+ configuredSampler
+
+ private[kamon] def reconfigure(config: Config): Unit = synchronized {
+ Try {
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val traceConfig = config.getConfig("kamon.trace")
- def sampler: Sampler =
- configuredSampler
+ val newSampler = traceConfig.getString("sampler") match {
+ case "always" => Sampler.Always
+ case "never" => Sampler.Never
+ case "random" => Sampler.random(traceConfig.getDouble("random-sampler.probability"))
+ case other => sys.error(s"Unexpected sampler name $other.")
+ }
+
+ val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id")
- def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.textMapSpanContextCodec = codec
+ val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider](
+ traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)]
+ ).get
- def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.httpHeaderSpanContextCodec = codec
+ configuredSampler = newSampler
+ joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID
+ _identityProvider = newIdentityProvider
- private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder {
- private var parentContext: SpanContext = _
+ }.failed.foreach {
+ ex => logger.error("Unable to reconfigure Kamon Tracer", ex)
+ }
+ }
+ }
+
+ object Default {
+ def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default =
+ new Default(metrics, reporterRegistry, initialConfig)
+ }
+
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) {
+ private var parentSpan: Span = _
private var startTimestamp = 0L
- private var initialTags = Map.empty[String, String]
+ private var initialSpanTags = Map.empty[String, Span.TagValue]
+ private var initialMetricTags = Map.empty[String, String]
private var useActiveSpanAsParent = true
- override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match {
- case spanContext: kamon.trace.SpanContext =>
- this.parentContext = spanContext
- this
- case null => this
- case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this
+ def asChildOf(parent: Span): SpanBuilder = {
+ if(parent != Span.Empty) this.parentSpan = parent
+ this
}
- override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder =
- asChildOf(parent.context())
-
- override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = {
- if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) {
- asChildOf(referencedContext)
- } else this
+ def withMetricTag(key: String, value: String): SpanBuilder = {
+ this.initialMetricTags = this.initialMetricTags + (key -> value)
+ this
}
- override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value)
+ def withSpanTag(key: String, value: String): SpanBuilder = {
+ this.initialSpanTags = this.initialSpanTags + (key -> TagValue.String(value))
this
}
- override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value.toString)
+ def withSpanTag(key: String, value: Long): SpanBuilder = {
+ this.initialSpanTags = this.initialSpanTags + (key -> TagValue.Number(value))
this
}
- override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = {
- this.initialTags = this.initialTags + (key -> value.toString)
+ def withSpanTag(key: String, value: Boolean): SpanBuilder = {
+ val tagValue = if (value) TagValue.True else TagValue.False
+ this.initialSpanTags = this.initialSpanTags + (key -> tagValue)
this
}
- override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = {
+ def withStartTimestamp(microseconds: Long): SpanBuilder = {
this.startTimestamp = microseconds
this
}
- override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = {
+ def ignoreActiveSpan(): SpanBuilder = {
this.useActiveSpanAsParent = false
this
}
- override def start(): io.opentracing.Span =
- startManual()
+ def start(): Span = {
+ val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp()
- override def startActive(): io.opentracing.ActiveSpan =
- makeActive(startManual())
+ val parentSpan: Option[Span] = Option(this.parentSpan)
+ .orElse(if(useActiveSpanAsParent) Some(Kamon.currentContext().get(Span.ContextKey)) else None)
+ .filter(span => span != Span.Empty)
- override def startManual(): Span = {
- val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp()
+ val samplingDecision: SamplingDecision = parentSpan
+ .map(_.context.samplingDecision)
+ .filter(_ != SamplingDecision.Unknown)
+ .getOrElse(tracer.sampler.decide(operationName, initialSpanTags))
- if(parentContext == null && useActiveSpanAsParent) {
- val possibleParent = activeSpan()
- if(possibleParent != null)
- parentContext = possibleParent.context().asInstanceOf[SpanContext]
+ val spanContext = parentSpan match {
+ case Some(parent) => joinParentContext(parent, samplingDecision)
+ case None => newSpanContext(samplingDecision)
}
- val spanContext =
- if(parentContext != null)
- new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, parentContext.baggageMap)
- else {
- val traceID = createID()
- new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), Map.empty)
- }
-
- tracerMetrics.createdSpans.increment()
- new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry)
+ tracer.tracerMetrics.createdSpans.increment()
+ Span.Local(spanContext, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry)
}
- private def createID(): Long =
- ThreadLocalRandom.current().nextLong()
- }
-
-
- private[kamon] def reconfigure(config: Config): Unit = synchronized {
- val traceConfig = config.getConfig("kamon.trace")
-
- configuredSampler = traceConfig.getString("sampler") match {
- case "always" => Sampler.always
- case "never" => Sampler.never
- case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance"))
- case other => sys.error(s"Unexpected sampler name $other.")
- }
+ private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
+ if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID)
+ parent.context().copy(samplingDecision = samplingDecision)
+ else
+ parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)
+
+ private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
+ SpanContext(
+ traceID = tracer._identityProvider.traceIdGenerator().generate(),
+ spanID = tracer._identityProvider.spanIdGenerator().generate(),
+ parentID = IdentityProvider.NoIdentifier,
+ samplingDecision = samplingDecision
+ )
}
private final class TracerMetrics(metricLookup: MetricLookup) {