aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-07-17 13:13:41 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-07-17 13:13:41 +0200
commit483159862293a065be7cf3743d1aa759fbf31fc0 (patch)
treec6c2753c1c7abd7f2e44d7686bd088a51267867d /kamon-core/src/main
parent34010efc7b273e50d805a277646f14aa96aaa8b2 (diff)
downloadKamon-483159862293a065be7cf3743d1aa759fbf31fc0.tar.gz
Kamon-483159862293a065be7cf3743d1aa759fbf31fc0.tar.bz2
Kamon-483159862293a065be7cf3743d1aa759fbf31fc0.zip
working on ID generation and SpanContext encoding/decoding
Diffstat (limited to 'kamon-core/src/main')
-rw-r--r--kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala (renamed from kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala)5
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala181
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala69
-rw-r--r--kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala72
4 files changed, 127 insertions, 200 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala
index ea23227a..25e8f3c0 100644
--- a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala
+++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala
@@ -17,7 +17,6 @@ object IdentityProvider {
val NoIdentifier = Identifier("", new Array[Byte](0))
-
trait Generator {
def generate(): Identifier
def from(string: String): Identifier
@@ -54,4 +53,8 @@ object IdentityProvider {
override def traceIdentifierGenerator(): Generator = generator
override def spanIdentifierGenerator(): Generator = generator
}
+
+ object Default {
+ def apply(): Default = new Default()
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
index 23eb40db..11d6de2c 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala
@@ -13,152 +13,134 @@
* =========================================================================================
*/
-
package kamon.trace
+import java.lang.StringBuilder
import java.net.{URLDecoder, URLEncoder}
import java.nio.ByteBuffer
-import java.util.concurrent.ThreadLocalRandom
-
import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source}
-
-import scala.collection.JavaConverters._
-import kamon.util.HexCodec
-
+import scala.collection.mutable
trait SpanContextCodec[T] {
- def inject(spanContext: SpanContext, carrier: T): Unit
+ def inject(spanContext: SpanContext, carrier: T): T
+ def inject(spanContext: SpanContext): T
def extract(carrier: T): Option[SpanContext]
}
-trait TextMap {
- def get(key: String): Option[String]
- def put(key: String, value: String): Unit
- def values: Iterator[(String, String)]
-}
-
-
object SpanContextCodec {
- trait Format[C]
+ sealed trait Format[C]
object Format {
case object TextMap extends Format[TextMap]
case object HttpHeaders extends Format[TextMap]
case object Binary extends Format[ByteBuffer]
}
-// val ExtendedB3: SpanContextCodec[TextMap] = new TextMapSpanCodec(
-// traceIDKey = "X-B3-TraceId",
-// parentIDKey = "X-B3-ParentSpanId",
-// spanIDKey = "X-B3-SpanId",
-// sampledKey = "X-B3-Sampled",
-// baggageKey = "X-Kamon-Baggage-",
-// baggageValueEncoder = urlEncode,
-// baggageValueDecoder = urlDecode
-// )
-
- private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8")
- private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8")
-
- private class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] {
+ class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] {
import ExtendedB3.Headers
+ override def inject(spanContext: SpanContext, carrier: TextMap): TextMap = {
+ if(spanContext != SpanContext.EmptySpanContext) {
+ carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string))
+ carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string))
+ carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string))
+ carrier.put(Headers.Sampled, encodeSamplingDecision(spanContext.samplingDecision))
+ carrier.put(Headers.Baggage, encodeBaggage(spanContext.baggage))
- override def inject(spanContext: SpanContext, carrier: TextMap): Unit = {
- carrier.put(Headers.TraceIdentifier, baggageValueEncoder(spanContext.traceID.string))
- carrier.put(parentIDKey, baggageValueEncoder(spanContext.parentID.string))
- carrier.put(spanIDKey, baggageValueEncoder(spanContext.spanID.string))
-
- spanContext.baggage.getAll().foreach {
- case (key, value) => carrier.put(baggageKey + key, baggageValueEncoder(value))
+ spanContext.baggage.get(Headers.Flags).foreach { flags =>
+ carrier.put(Headers.Flags, flags)
+ }
}
+
+ carrier
+ }
+
+ override def inject(spanContext: SpanContext): TextMap = {
+ val mutableTextMap = TextMap.Default()
+ inject(spanContext, mutableTextMap)
+ mutableTextMap
}
override def extract(carrier: TextMap): Option[SpanContext] = {
val traceID = carrier.get(Headers.TraceIdentifier)
- .map(identityProvider.traceIdentifierGenerator().from)
+ .map(id => identityProvider.traceIdentifierGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
val spanID = carrier.get(Headers.SpanIdentifier)
- .map(identityProvider.spanIdentifierGenerator().from)
+ .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) {
val parentID = carrier.get(Headers.ParentSpanIdentifier)
- .map(identityProvider.spanIdentifierGenerator().from)
+ .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id)))
.getOrElse(IdentityProvider.NoIdentifier)
- val samplingDecision = carrier.get(Headers.Flags).orElse(carrier.get(Headers.Sampled)) match {
+ val baggage = decodeBaggage(carrier.get(Headers.Baggage))
+ val flags = carrier.get(Headers.Flags)
+
+ flags.foreach { flags =>
+ baggage.add(Headers.Flags, flags)
+ }
+
+ val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match {
case Some(sampled) if sampled == "1" => SamplingDecision.Sample
case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample
case _ => SamplingDecision.Unknown
}
-
-
-
- Some(SpanContext(traceID, spanID, parentID, samplingDecision, ???, Source.Remote))
+ Some(SpanContext(traceID, spanID, parentID, samplingDecision, baggage, Source.Remote))
} else None
-
- val minimalSpanContext =
- for {
- traceID <- carrier.get(traceIDKey).map(identityProvider.traceIdentifierGenerator().from)
- spanID <- carrier.get(spanIDKey).map(identityProvider.spanIdentifierGenerator().from)
- } yield {
-
-
- }
-
-
-
-// var traceID: String = null
-// var parentID: String = null
-// var spanID: String = null
-// var sampled: String = null
-// var baggage: Map[String, String] = Map.empty
-//
-// carrier.iterator().asScala.foreach { entry =>
-// if(entry.getKey.equals(traceIDKey))
-// traceID = baggageValueDecoder(entry.getValue)
-// else if(entry.getKey.equals(parentIDKey))
-// parentID = baggageValueDecoder(entry.getValue)
-// else if(entry.getKey.equals(spanIDKey))
-// spanID = baggageValueDecoder(entry.getValue)
-// else if(entry.getKey.equals(sampledKey))
-// sampled = entry.getValue
-// else if(entry.getKey.startsWith(baggagePrefix))
-// baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue))
-// }
-//
-// if(traceID != null && spanID != null) {
-// val actualParent = if(parentID == null) 0L else decodeLong(parentID)
-// val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1")
-//
-// new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage)
-// } else null
-//
-// None
}
private def encodeBaggage(baggage: Baggage): String = {
if(baggage.getAll().nonEmpty) {
-
+ val encodedBaggage = new StringBuilder()
baggage.getAll().foreach {
- case (key, value) =>
+ case (key, value) if key != Headers.Flags =>
+ if(encodedBaggage.length() > 0)
+ encodedBaggage.append(';')
+
+ encodedBaggage
+ .append(urlEncode(key))
+ .append('=')
+ .append(urlEncode(value))
}
+
+ encodedBaggage.toString()
} else ""
}
- private def decodeLong(input: String): Long =
- HexCodec.lowerHexToUnsignedLong(input)
+ private def decodeBaggage(encodedBaggage: Option[String]): Baggage = {
+ val baggage = Baggage()
+ encodedBaggage.foreach { baggageString =>
+ baggageString.split(";").foreach { group =>
+ val pair = group.split("=")
+ if(pair.length >= 2 && pair(0).nonEmpty) {
+ baggage.add(urlDecode(pair(0)), urlDecode(pair(1)))
+ }
+ }
+ }
+
+ baggage
+ }
+
+ private def encodeSamplingDecision(samplingDecision: SamplingDecision): String = samplingDecision match {
+ case SamplingDecision.Sample => "1"
+ case SamplingDecision.DoNotSample => "0"
+ case SamplingDecision.Unknown => ""
+ }
- private def encodeLong(input: Long): String =
- HexCodec.toLowerHex(input)
+ private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8")
+ private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8")
}
object ExtendedB3 {
+
+ def apply(identityProvider: IdentityProvider): ExtendedB3 =
+ new ExtendedB3(identityProvider)
+
object Headers {
val TraceIdentifier = "X-B3-TraceId"
val ParentSpanIdentifier = "X-B3-ParentSpanId"
@@ -169,3 +151,22 @@ object SpanContextCodec {
}
}
}
+
+trait TextMap {
+ def get(key: String): Option[String]
+ def put(key: String, value: String): Unit
+ def values: Iterator[(String, String)]
+}
+
+object TextMap {
+ class Default extends TextMap {
+ private val storage = mutable.Map.empty[String, String]
+ override def get(key: String): Option[String] = storage.get(key)
+ override def put(key: String, value: String): Unit = storage.put(key, value)
+ override def values: Iterator[(String, String)] = storage.toIterator
+ }
+
+ object Default {
+ def apply(): Default = new Default()
+ }
+}
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index 1aec8d7c..08643c63 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -27,22 +27,17 @@ import kamon.util.Clock
import org.slf4j.LoggerFactory
-trait Tracer {
- def buildSpan(operationName: String): SpanBuilder
+trait ActiveSpanSource {
def activeSpan(): ActiveSpan
def makeActive(span: Span): ActiveSpan
+}
- def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext]
- def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit
-
-
-
- //
- // Configuration Utilities
- //
+trait Tracer extends ActiveSpanSource{
+ def buildSpan(operationName: String): SpanBuilder
- def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit
- def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit
+ def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext]
+ def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C
+ def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C
}
object Tracer {
@@ -57,33 +52,40 @@ object Tracer {
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
@volatile private[Tracer] var joinRemoteSpansWithSameID: Boolean = false
@volatile private[Tracer] var configuredSampler: Sampler = Sampler.never
- @volatile private[Tracer] var idGenerator: IdentifierGenerator = IdentifierGenerator.RandomLong()
- @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.TextMap
- @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ZipkinB3
+ @volatile private[Tracer] var identityProvider: IdentityProvider = IdentityProvider.Default()
+ @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider)
+ @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider)
reconfigure(initialConfig)
- def buildSpan(operationName: String): SpanBuilder =
+ override def buildSpan(operationName: String): SpanBuilder =
new SpanBuilder(operationName, this, reporterRegistry)
- def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match {
+ override def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match {
case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap])
case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap])
case SpanContextCodec.Format.Binary => None
case _ => None
}
- def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit = format match {
+ override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C = format match {
case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap])
case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap])
- case SpanContextCodec.Format.Binary =>
- case _ =>
+ case SpanContextCodec.Format.Binary => carrier
+ case _ => carrier
}
- def activeSpan(): ActiveSpan =
+ override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C = format match {
+ case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext)
+ case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext)
+ case SpanContextCodec.Format.Binary => ByteBuffer.allocate(0) // TODO: Implement binary encoding.
+ case _ => sys.error("can't do")
+ }
+
+ override def activeSpan(): ActiveSpan =
activeSpanStorage.get()
- def makeActive(span: Span): ActiveSpan = {
+ override def makeActive(span: Span): ActiveSpan = {
val currentlyActiveSpan = activeSpanStorage.get()
val newActiveSpan = ActiveSpan.Default(span, currentlyActiveSpan, activeSpanStorage)
activeSpanStorage.set(newActiveSpan)
@@ -93,13 +95,6 @@ object Tracer {
def sampler: Sampler =
configuredSampler
- def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.textMapSpanContextCodec = codec
-
- def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit =
- this.httpHeaderSpanContextCodec = codec
-
-
private[kamon] def reconfigure(config: Config): Unit = synchronized {
val traceConfig = config.getConfig("kamon.trace")
@@ -110,10 +105,6 @@ object Tracer {
case other => sys.error(s"Unexpected sampler name $other.")
}
}
-
- private final class TracerMetrics(metricLookup: MetricLookup) {
- val createdSpans = metricLookup.counter("tracer.spans-created")
- }
}
final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) {
@@ -170,13 +161,13 @@ object Tracer {
if(parent.source == Source.Remote && tracer.joinRemoteSpansWithSameID)
parent.copy(samplingDecision = samplingDecision)
else
- parent.createChild(tracer.idGenerator.generateSpanID(), samplingDecision)
+ parent.createChild(tracer.identityProvider.spanIdentifierGenerator().generate(), samplingDecision)
private def newSpanContext(samplingDecision: SamplingDecision): SpanContext =
SpanContext(
- traceID = tracer.idGenerator.generateTraceID(),
- spanID = tracer.idGenerator.generateSpanID(),
- parentID = tracer.idGenerator.generateEmptyID(),
+ traceID = tracer.identityProvider.traceIdentifierGenerator().generate(),
+ spanID = tracer.identityProvider.spanIdentifierGenerator().generate(),
+ parentID = IdentityProvider.NoIdentifier,
samplingDecision = samplingDecision,
baggage = SpanContext.Baggage(),
source = Source.Local
@@ -185,4 +176,8 @@ object Tracer {
def startActive(): ActiveSpan =
tracer.makeActive(start())
}
+
+ private final class TracerMetrics(metricLookup: MetricLookup) {
+ val createdSpans = metricLookup.counter("tracer.spans-created")
+ }
}
diff --git a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala
deleted file mode 100644
index 83027cc5..00000000
--- a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * =========================================================================================
- * Copyright © 2013-2015 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-package kamon.util
-
-import java.util.function.Supplier
-
-import kamon.trace.{SpanContext => KamonSpanContext}
-import kamon.Kamon
-import org.slf4j.MDC
-
-import scala.collection.JavaConverters._
-//
-//object BaggageOnMDC {
-//
-// /**
-// * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys
-// * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well.
-// *
-// */
-// def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = {
-// val activeSpan = Kamon.activeSpan()
-// if(activeSpan == null)
-// code
-// else {
-// val baggageItems = activeSpan.context().baggageItems().asScala
-// baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue))
-// if(includeTraceID)
-// addTraceIDToMDC(activeSpan.context())
-//
-// val evaluatedCode = code
-//
-// baggageItems.foreach(entry => MDC.remove(entry.getKey))
-// if(includeTraceID)
-// removeTraceIDFromMDC()
-//
-// evaluatedCode
-//
-// }
-// }
-//
-// def withBaggageOnMDC[T](code: Supplier[T]): T =
-// withBaggageOnMDC(true, code.get())
-//
-// def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T =
-// withBaggageOnMDC(includeTraceID, code.get())
-//
-// def withBaggageOnMDC[T](code: => T): T =
-// withBaggageOnMDC(true, code)
-//
-// private val TraceIDKey = "trace_id"
-//
-// private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match {
-// case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID))
-// case _ =>
-// }
-//
-// private def removeTraceIDFromMDC(): Unit =
-// MDC.remove(TraceIDKey)
-//}