aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/trace
diff options
context:
space:
mode:
Diffstat (limited to 'kamon-core/src/main/scala/kamon/trace')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala (renamed from kamon-core/src/main/scala/kamon/trace/SpanCodec.scala)128
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala18
4 files changed, 91 insertions, 63 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 43391af7..6015e350 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -19,7 +19,7 @@ package trace
import java.time.Instant
import kamon.ReporterRegistry.SpanSink
-import kamon.context.Key
+import kamon.context.Context
import kamon.metric.MeasurementUnit
import kamon.trace.SpanContext.SamplingDecision
import kamon.util.Clock
@@ -66,7 +66,7 @@ sealed abstract class Span {
object Span {
- val ContextKey = Key.broadcast[Span]("span", Span.Empty)
+ val ContextKey = Context.key[Span]("span", Span.Empty)
object Empty extends Span {
override val context: SpanContext = SpanContext.EmptySpanContext
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
index 2a8e2271..ed329ef7 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCustomizer.scala
@@ -15,7 +15,7 @@
package kamon.trace
-import kamon.context.Key
+import kamon.context.{Context}
import kamon.trace.Tracer.SpanBuilder
/**
@@ -39,7 +39,7 @@ object SpanCustomizer {
override def customize(spanBuilder: SpanBuilder): SpanBuilder = spanBuilder
}
- val ContextKey = Key.local[SpanCustomizer]("span-customizer", Noop)
+ val ContextKey = Context.key[SpanCustomizer]("span-customizer", Noop)
def forOperationName(operationName: String): SpanCustomizer = new SpanCustomizer {
override def customize(spanBuilder: SpanBuilder): SpanBuilder =
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
index 14b28d54..dc168347 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanPropagation.scala
@@ -19,54 +19,43 @@ import java.net.{URLDecoder, URLEncoder}
import java.nio.ByteBuffer
import kamon.Kamon
-import kamon.context.{Codecs, Context, TextMap}
+import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
+import kamon.context._
import kamon.context.generated.binary.span.{Span => ColferSpan}
+import kamon.context.HttpPropagation.{HeaderReader, HeaderWriter}
import kamon.trace.SpanContext.SamplingDecision
-object SpanCodec {
+/**
+ * Propagation mechanisms for Kamon's Span data to and from HTTP and Binary mediums.
+ */
+object SpanPropagation {
- class B3 extends Codecs.ForEntry[TextMap] {
+ /**
+ * Reads and Writes a Span instance using the B3 propagation format. The specification and semantics of the B3
+ * Propagation protocol can be found here: https://github.com/openzipkin/b3-propagation
+ */
+ class B3 extends Propagation.EntryReader[HeaderReader] with Propagation.EntryWriter[HeaderWriter] {
import B3.Headers
- override def encode(context: Context): TextMap = {
- val span = context.get(Span.ContextKey)
- val carrier = TextMap.Default()
-
- if(span.nonEmpty()) {
- val spanContext = span.context()
- carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
- carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
-
- if(spanContext.parentID != IdentityProvider.NoIdentifier)
- carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
-
- encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
- carrier.put(Headers.Sampled, samplingDecision)
- }
- }
-
- carrier
- }
-
- override def decode(carrier: TextMap, context: Context): Context = {
+ override def read(reader: HttpPropagation.HeaderReader, context: Context): Context = {
val identityProvider = Kamon.tracer.identityProvider
- val traceID = carrier.get(Headers.TraceIdentifier)
+ val traceID = reader.read(Headers.TraceIdentifier)
.map(id => identityProvider.traceIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val spanID = carrier.get(Headers.SpanIdentifier)
+ val spanID = reader.read(Headers.SpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
- val parentID = carrier.get(Headers.ParentSpanIdentifier)
+ val parentID = reader.read(Headers.ParentSpanIdentifier)
.map(id => identityProvider.spanIdGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val flags = carrier.get(Headers.Flags)
+ val flags = reader.read(Headers.Flags)
- val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match {
+ val samplingDecision = flags.orElse(reader.read(Headers.Sampled)) match {
case Some(sampled) if sampled == "1" => SamplingDecision.Sample
case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
@@ -77,6 +66,24 @@ object SpanCodec {
} else context
}
+
+ override def write(context: Context, writer: HttpPropagation.HeaderWriter): Unit = {
+ val span = context.get(Span.ContextKey)
+
+ if(span.nonEmpty()) {
+ val spanContext = span.context()
+ writer.write(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ writer.write(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+
+ if(spanContext.parentID != IdentityProvider.NoIdentifier)
+ writer.write(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+
+ encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision =>
+ writer.write(Headers.Sampled, samplingDecision)
+ }
+ }
+ }
+
private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match {
case SamplingDecision.Sample => Some("1")
case SamplingDecision.DoNotSample => Some("0")
@@ -102,38 +109,27 @@ object SpanCodec {
}
- class Colfer extends Codecs.ForEntry[ByteBuffer] {
- val emptyBuffer = ByteBuffer.allocate(0)
-
- override def encode(context: Context): ByteBuffer = {
- val span = context.get(Span.ContextKey)
- if(span.nonEmpty()) {
- val marshalBuffer = Colfer.codecBuffer.get()
- val colferSpan = new ColferSpan()
- val spanContext = span.context()
-
- colferSpan.setTraceID(spanContext.traceID.bytes)
- colferSpan.setSpanID(spanContext.spanID.bytes)
- colferSpan.setParentID(spanContext.parentID.bytes)
- colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
-
- val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
- val buffer = ByteBuffer.allocate(marshalledSize)
- buffer.put(marshalBuffer, 0, marshalledSize)
- buffer
-
- } else emptyBuffer
- }
-
- override def decode(carrier: ByteBuffer, context: Context): Context = {
- carrier.clear()
-
- if(carrier.capacity() == 0)
+ /**
+ * Defines a bare bones binary context propagation that uses Colfer [1] as the serialization library. The Schema
+ * for the Span data is simply defined as:
+ *
+ * type Span struct {
+ * traceID binary
+ * spanID binary
+ * parentID binary
+ * samplingDecision uint8
+ * }
+ *
+ */
+ class Colfer extends Propagation.EntryReader[ByteStreamReader] with Propagation.EntryWriter[ByteStreamWriter] {
+
+ override def read(medium: ByteStreamReader, context: Context): Context = {
+ if(medium.available() == 0)
context
else {
val identityProvider = Kamon.tracer.identityProvider
val colferSpan = new ColferSpan()
- colferSpan.unmarshal(carrier.array(), 0)
+ colferSpan.unmarshal(medium.readAll(), 0)
val spanContext = SpanContext(
traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID),
@@ -146,6 +142,24 @@ object SpanCodec {
}
}
+ override def write(context: Context, medium: ByteStreamWriter): Unit = {
+ val span = context.get(Span.ContextKey)
+
+ if(span.nonEmpty()) {
+ val marshalBuffer = Colfer.codecBuffer.get()
+ val colferSpan = new ColferSpan()
+ val spanContext = span.context()
+
+ colferSpan.setTraceID(spanContext.traceID.bytes)
+ colferSpan.setSpanID(spanContext.spanID.bytes)
+ colferSpan.setParentID(spanContext.parentID.bytes)
+ colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
+
+ val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
+ medium.write(marshalBuffer, 0, marshalledSize)
+
+ }
+ }
private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match {
case SamplingDecision.Sample => 1
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 3e857f00..ad7ffbed 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -99,6 +99,7 @@ object Tracer {
private var initialMetricTags = Map.empty[String, String]
private var useParentFromContext = true
private var trackMetrics = true
+ private var providedTraceID = IdentityProvider.NoIdentifier
def asChildOf(parent: Span): SpanBuilder = {
if(parent != Span.Empty) this.parentSpan = parent
@@ -158,6 +159,11 @@ object Tracer {
this
}
+ def withTraceID(identifier: IdentityProvider.Identifier): SpanBuilder = {
+ this.providedTraceID = identifier
+ this
+ }
+
def start(): Span = {
val spanFrom = if(from == Instant.EPOCH) clock.instant() else from
@@ -199,13 +205,21 @@ object Tracer {
else
parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision)
- private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
+ private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = {
+ val traceID =
+ if(providedTraceID != IdentityProvider.NoIdentifier)
+ providedTraceID
+ else
+ tracer._identityProvider.traceIdGenerator().generate()
+
+
SpanContext(
- traceID = tracer._identityProvider.traceIdGenerator().generate(),
+ traceID,
spanID = tracer._identityProvider.spanIdGenerator().generate(),
parentID = IdentityProvider.NoIdentifier,
samplingDecision = samplingDecision
)
+ }
}
private final class TracerMetrics(metricLookup: MetricLookup) {