diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-07-17 13:13:41 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-07-17 13:13:41 +0200 |
commit | 483159862293a065be7cf3743d1aa759fbf31fc0 (patch) | |
tree | c6c2753c1c7abd7f2e44d7686bd088a51267867d /kamon-core/src/main/scala/kamon | |
parent | 34010efc7b273e50d805a277646f14aa96aaa8b2 (diff) | |
download | Kamon-483159862293a065be7cf3743d1aa759fbf31fc0.tar.gz Kamon-483159862293a065be7cf3743d1aa759fbf31fc0.tar.bz2 Kamon-483159862293a065be7cf3743d1aa759fbf31fc0.zip |
working on ID generation and SpanContext encoding/decoding
Diffstat (limited to 'kamon-core/src/main/scala/kamon')
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala (renamed from kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala) | 5 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala | 181 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/trace/Tracer.scala | 69 | ||||
-rw-r--r-- | kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala | 72 |
4 files changed, 127 insertions, 200 deletions
diff --git a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala index ea23227a..25e8f3c0 100644 --- a/kamon-core/src/main/scala/kamon/trace/IdentifierGenerator.scala +++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala @@ -17,7 +17,6 @@ object IdentityProvider { val NoIdentifier = Identifier("", new Array[Byte](0)) - trait Generator { def generate(): Identifier def from(string: String): Identifier @@ -54,4 +53,8 @@ object IdentityProvider { override def traceIdentifierGenerator(): Generator = generator override def spanIdentifierGenerator(): Generator = generator } + + object Default { + def apply(): Default = new Default() + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala index 23eb40db..11d6de2c 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala @@ -13,152 +13,134 @@ * ========================================================================================= */ - package kamon.trace +import java.lang.StringBuilder import java.net.{URLDecoder, URLEncoder} import java.nio.ByteBuffer -import java.util.concurrent.ThreadLocalRandom - import kamon.trace.SpanContext.{Baggage, SamplingDecision, Source} - -import scala.collection.JavaConverters._ -import kamon.util.HexCodec - +import scala.collection.mutable trait SpanContextCodec[T] { - def inject(spanContext: SpanContext, carrier: T): Unit + def inject(spanContext: SpanContext, carrier: T): T + def inject(spanContext: SpanContext): T def extract(carrier: T): Option[SpanContext] } -trait TextMap { - def get(key: String): Option[String] - def put(key: String, value: String): Unit - def values: Iterator[(String, String)] -} - - object SpanContextCodec { - trait Format[C] + sealed trait Format[C] object Format { case object TextMap extends Format[TextMap] case object HttpHeaders extends Format[TextMap] case object Binary extends Format[ByteBuffer] } -// val ExtendedB3: SpanContextCodec[TextMap] = new TextMapSpanCodec( -// traceIDKey = "X-B3-TraceId", -// parentIDKey = "X-B3-ParentSpanId", -// spanIDKey = "X-B3-SpanId", -// sampledKey = "X-B3-Sampled", -// baggageKey = "X-Kamon-Baggage-", -// baggageValueEncoder = urlEncode, -// baggageValueDecoder = urlDecode -// ) - - private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") - private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") - - private class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] { + class ExtendedB3(identityProvider: IdentityProvider) extends SpanContextCodec[TextMap] { import ExtendedB3.Headers + override def inject(spanContext: SpanContext, carrier: TextMap): TextMap = { + if(spanContext != SpanContext.EmptySpanContext) { + carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + carrier.put(Headers.Sampled, encodeSamplingDecision(spanContext.samplingDecision)) + carrier.put(Headers.Baggage, encodeBaggage(spanContext.baggage)) - override def inject(spanContext: SpanContext, carrier: TextMap): Unit = { - carrier.put(Headers.TraceIdentifier, baggageValueEncoder(spanContext.traceID.string)) - carrier.put(parentIDKey, baggageValueEncoder(spanContext.parentID.string)) - carrier.put(spanIDKey, baggageValueEncoder(spanContext.spanID.string)) - - spanContext.baggage.getAll().foreach { - case (key, value) => carrier.put(baggageKey + key, baggageValueEncoder(value)) + spanContext.baggage.get(Headers.Flags).foreach { flags => + carrier.put(Headers.Flags, flags) + } } + + carrier + } + + override def inject(spanContext: SpanContext): TextMap = { + val mutableTextMap = TextMap.Default() + inject(spanContext, mutableTextMap) + mutableTextMap } override def extract(carrier: TextMap): Option[SpanContext] = { val traceID = carrier.get(Headers.TraceIdentifier) - .map(identityProvider.traceIdentifierGenerator().from) + .map(id => identityProvider.traceIdentifierGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) val spanID = carrier.get(Headers.SpanIdentifier) - .map(identityProvider.spanIdentifierGenerator().from) + .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { val parentID = carrier.get(Headers.ParentSpanIdentifier) - .map(identityProvider.spanIdentifierGenerator().from) + .map(id => identityProvider.spanIdentifierGenerator().from(urlDecode(id))) .getOrElse(IdentityProvider.NoIdentifier) - val samplingDecision = carrier.get(Headers.Flags).orElse(carrier.get(Headers.Sampled)) match { + val baggage = decodeBaggage(carrier.get(Headers.Baggage)) + val flags = carrier.get(Headers.Flags) + + flags.foreach { flags => + baggage.add(Headers.Flags, flags) + } + + val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { case Some(sampled) if sampled == "1" => SamplingDecision.Sample case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample case _ => SamplingDecision.Unknown } - - - - Some(SpanContext(traceID, spanID, parentID, samplingDecision, ???, Source.Remote)) + Some(SpanContext(traceID, spanID, parentID, samplingDecision, baggage, Source.Remote)) } else None - - val minimalSpanContext = - for { - traceID <- carrier.get(traceIDKey).map(identityProvider.traceIdentifierGenerator().from) - spanID <- carrier.get(spanIDKey).map(identityProvider.spanIdentifierGenerator().from) - } yield { - - - } - - - -// var traceID: String = null -// var parentID: String = null -// var spanID: String = null -// var sampled: String = null -// var baggage: Map[String, String] = Map.empty -// -// carrier.iterator().asScala.foreach { entry => -// if(entry.getKey.equals(traceIDKey)) -// traceID = baggageValueDecoder(entry.getValue) -// else if(entry.getKey.equals(parentIDKey)) -// parentID = baggageValueDecoder(entry.getValue) -// else if(entry.getKey.equals(spanIDKey)) -// spanID = baggageValueDecoder(entry.getValue) -// else if(entry.getKey.equals(sampledKey)) -// sampled = entry.getValue -// else if(entry.getKey.startsWith(baggagePrefix)) -// baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue)) -// } -// -// if(traceID != null && spanID != null) { -// val actualParent = if(parentID == null) 0L else decodeLong(parentID) -// val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1") -// -// new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage) -// } else null -// -// None } private def encodeBaggage(baggage: Baggage): String = { if(baggage.getAll().nonEmpty) { - + val encodedBaggage = new StringBuilder() baggage.getAll().foreach { - case (key, value) => + case (key, value) if key != Headers.Flags => + if(encodedBaggage.length() > 0) + encodedBaggage.append(';') + + encodedBaggage + .append(urlEncode(key)) + .append('=') + .append(urlEncode(value)) } + + encodedBaggage.toString() } else "" } - private def decodeLong(input: String): Long = - HexCodec.lowerHexToUnsignedLong(input) + private def decodeBaggage(encodedBaggage: Option[String]): Baggage = { + val baggage = Baggage() + encodedBaggage.foreach { baggageString => + baggageString.split(";").foreach { group => + val pair = group.split("=") + if(pair.length >= 2 && pair(0).nonEmpty) { + baggage.add(urlDecode(pair(0)), urlDecode(pair(1))) + } + } + } + + baggage + } + + private def encodeSamplingDecision(samplingDecision: SamplingDecision): String = samplingDecision match { + case SamplingDecision.Sample => "1" + case SamplingDecision.DoNotSample => "0" + case SamplingDecision.Unknown => "" + } - private def encodeLong(input: Long): String = - HexCodec.toLowerHex(input) + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") } object ExtendedB3 { + + def apply(identityProvider: IdentityProvider): ExtendedB3 = + new ExtendedB3(identityProvider) + object Headers { val TraceIdentifier = "X-B3-TraceId" val ParentSpanIdentifier = "X-B3-ParentSpanId" @@ -169,3 +151,22 @@ object SpanContextCodec { } } } + +trait TextMap { + def get(key: String): Option[String] + def put(key: String, value: String): Unit + def values: Iterator[(String, String)] +} + +object TextMap { + class Default extends TextMap { + private val storage = mutable.Map.empty[String, String] + override def get(key: String): Option[String] = storage.get(key) + override def put(key: String, value: String): Unit = storage.put(key, value) + override def values: Iterator[(String, String)] = storage.toIterator + } + + object Default { + def apply(): Default = new Default() + } +} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 1aec8d7c..08643c63 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -27,22 +27,17 @@ import kamon.util.Clock import org.slf4j.LoggerFactory -trait Tracer { - def buildSpan(operationName: String): SpanBuilder +trait ActiveSpanSource { def activeSpan(): ActiveSpan def makeActive(span: Span): ActiveSpan +} - def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] - def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit - - - - // - // Configuration Utilities - // +trait Tracer extends ActiveSpanSource{ + def buildSpan(operationName: String): SpanBuilder - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit + def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C + def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C } object Tracer { @@ -57,33 +52,40 @@ object Tracer { private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @volatile private[Tracer] var joinRemoteSpansWithSameID: Boolean = false @volatile private[Tracer] var configuredSampler: Sampler = Sampler.never - @volatile private[Tracer] var idGenerator: IdentifierGenerator = IdentifierGenerator.RandomLong() - @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.TextMap - @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ZipkinB3 + @volatile private[Tracer] var identityProvider: IdentityProvider = IdentityProvider.Default() + @volatile private[Tracer] var textMapSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider) + @volatile private[Tracer] var httpHeaderSpanContextCodec: SpanContextCodec[TextMap] = SpanContextCodec.ExtendedB3(identityProvider) reconfigure(initialConfig) - def buildSpan(operationName: String): SpanBuilder = + override def buildSpan(operationName: String): SpanBuilder = new SpanBuilder(operationName, this, reporterRegistry) - def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match { + override def extract[C](format: SpanContextCodec.Format[C], carrier: C): Option[SpanContext] = format match { case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap]) case SpanContextCodec.Format.Binary => None case _ => None } - def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): Unit = format match { + override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C], carrier: C): C = format match { case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext, carrier.asInstanceOf[TextMap]) - case SpanContextCodec.Format.Binary => - case _ => + case SpanContextCodec.Format.Binary => carrier + case _ => carrier } - def activeSpan(): ActiveSpan = + override def inject[C](spanContext: SpanContext, format: SpanContextCodec.Format[C]): C = format match { + case SpanContextCodec.Format.HttpHeaders => httpHeaderSpanContextCodec.inject(spanContext) + case SpanContextCodec.Format.TextMap => textMapSpanContextCodec.inject(spanContext) + case SpanContextCodec.Format.Binary => ByteBuffer.allocate(0) // TODO: Implement binary encoding. + case _ => sys.error("can't do") + } + + override def activeSpan(): ActiveSpan = activeSpanStorage.get() - def makeActive(span: Span): ActiveSpan = { + override def makeActive(span: Span): ActiveSpan = { val currentlyActiveSpan = activeSpanStorage.get() val newActiveSpan = ActiveSpan.Default(span, currentlyActiveSpan, activeSpanStorage) activeSpanStorage.set(newActiveSpan) @@ -93,13 +95,6 @@ object Tracer { def sampler: Sampler = configuredSampler - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.textMapSpanContextCodec = codec - - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.httpHeaderSpanContextCodec = codec - - private[kamon] def reconfigure(config: Config): Unit = synchronized { val traceConfig = config.getConfig("kamon.trace") @@ -110,10 +105,6 @@ object Tracer { case other => sys.error(s"Unexpected sampler name $other.") } } - - private final class TracerMetrics(metricLookup: MetricLookup) { - val createdSpans = metricLookup.counter("tracer.spans-created") - } } final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { @@ -170,13 +161,13 @@ object Tracer { if(parent.source == Source.Remote && tracer.joinRemoteSpansWithSameID) parent.copy(samplingDecision = samplingDecision) else - parent.createChild(tracer.idGenerator.generateSpanID(), samplingDecision) + parent.createChild(tracer.identityProvider.spanIdentifierGenerator().generate(), samplingDecision) private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = SpanContext( - traceID = tracer.idGenerator.generateTraceID(), - spanID = tracer.idGenerator.generateSpanID(), - parentID = tracer.idGenerator.generateEmptyID(), + traceID = tracer.identityProvider.traceIdentifierGenerator().generate(), + spanID = tracer.identityProvider.spanIdentifierGenerator().generate(), + parentID = IdentityProvider.NoIdentifier, samplingDecision = samplingDecision, baggage = SpanContext.Baggage(), source = Source.Local @@ -185,4 +176,8 @@ object Tracer { def startActive(): ActiveSpan = tracer.makeActive(start()) } + + private final class TracerMetrics(metricLookup: MetricLookup) { + val createdSpans = metricLookup.counter("tracer.spans-created") + } } diff --git a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala deleted file mode 100644 index 83027cc5..00000000 --- a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2015 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.util - -import java.util.function.Supplier - -import kamon.trace.{SpanContext => KamonSpanContext} -import kamon.Kamon -import org.slf4j.MDC - -import scala.collection.JavaConverters._ -// -//object BaggageOnMDC { -// -// /** -// * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys -// * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well. -// * -// */ -// def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = { -// val activeSpan = Kamon.activeSpan() -// if(activeSpan == null) -// code -// else { -// val baggageItems = activeSpan.context().baggageItems().asScala -// baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue)) -// if(includeTraceID) -// addTraceIDToMDC(activeSpan.context()) -// -// val evaluatedCode = code -// -// baggageItems.foreach(entry => MDC.remove(entry.getKey)) -// if(includeTraceID) -// removeTraceIDFromMDC() -// -// evaluatedCode -// -// } -// } -// -// def withBaggageOnMDC[T](code: Supplier[T]): T = -// withBaggageOnMDC(true, code.get()) -// -// def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T = -// withBaggageOnMDC(includeTraceID, code.get()) -// -// def withBaggageOnMDC[T](code: => T): T = -// withBaggageOnMDC(true, code) -// -// private val TraceIDKey = "trace_id" -// -// private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match { -// case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID)) -// case _ => -// } -// -// private def removeTraceIDFromMDC(): Unit = -// MDC.remove(TraceIDKey) -//} |